Skip to content

Commit 9c32d32

Browse files
committed
Always process holding cell ChannelMonitorUpdates asynchronously
We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline when freeing the holding cell, including when initiating shutdown.
1 parent c676f58 commit 9c32d32

File tree

3 files changed

+77
-83
lines changed

3 files changed

+77
-83
lines changed

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2600,7 +2600,15 @@ fn test_permanent_error_during_sending_shutdown() {
26002600
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
26012601

26022602
assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok());
2603-
check_closed_broadcast!(nodes[0], true);
2603+
2604+
// We always send the `shutdown` response when initiating a shutdown, even if we immediately
2605+
// close the channel thereafter.
2606+
let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
2607+
assert_eq!(msg_events.len(), 3);
2608+
if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); }
2609+
if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); }
2610+
if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); }
2611+
26042612
check_added_monitors!(nodes[0], 2);
26052613
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
26062614
}

lightning/src/ln/channel.rs

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3186,16 +3186,16 @@ impl<Signer: Sign> Channel<Signer> {
31863186
/// Public version of the below, checking relevant preconditions first.
31873187
/// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
31883188
/// returns `(None, Vec::new())`.
3189-
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
3189+
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
31903190
if self.channel_state >= ChannelState::ChannelReady as u32 &&
31913191
(self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
31923192
self.free_holding_cell_htlcs(logger)
3193-
} else { Ok((None, Vec::new())) }
3193+
} else { (None, Vec::new()) }
31943194
}
31953195

31963196
/// Frees any pending commitment updates in the holding cell, generating the relevant messages
31973197
/// for our counterparty.
3198-
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
3198+
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
31993199
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
32003200
if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
32013201
log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.holding_cell_htlc_updates.len(),
@@ -3276,16 +3276,16 @@ impl<Signer: Sign> Channel<Signer> {
32763276
}
32773277
}
32783278
if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
3279-
return Ok((None, htlcs_to_fail));
3279+
return (None, htlcs_to_fail);
32803280
}
32813281
let update_fee = if let Some(feerate) = self.holding_cell_update_fee.take() {
32823282
self.send_update_fee(feerate, false, logger)
32833283
} else {
32843284
None
32853285
};
32863286

3287-
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
3288-
// send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
3287+
let mut additional_update = self.build_commitment_no_status_check(logger);
3288+
// build_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
32893289
// but we want them to be strictly increasing by one, so reset it here.
32903290
self.latest_monitor_update_id = monitor_update.update_id;
32913291
monitor_update.updates.append(&mut additional_update.updates);
@@ -3294,16 +3294,11 @@ impl<Signer: Sign> Channel<Signer> {
32943294
log_bytes!(self.channel_id()), if update_fee.is_some() { "a fee update, " } else { "" },
32953295
update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());
32963296

3297-
Ok((Some((msgs::CommitmentUpdate {
3298-
update_add_htlcs,
3299-
update_fulfill_htlcs,
3300-
update_fail_htlcs,
3301-
update_fail_malformed_htlcs: Vec::new(),
3302-
update_fee,
3303-
commitment_signed,
3304-
}, monitor_update)), htlcs_to_fail))
3297+
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
3298+
self.pending_monitor_updates.push(monitor_update);
3299+
(Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
33053300
} else {
3306-
Ok((None, Vec::new()))
3301+
(None, Vec::new())
33073302
}
33083303
}
33093304

@@ -3513,17 +3508,9 @@ impl<Signer: Sign> Channel<Signer> {
35133508
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
35143509
}
35153510

3516-
match self.free_holding_cell_htlcs(logger)? {
3517-
(Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => {
3518-
commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len());
3519-
for fail_msg in update_fail_htlcs.drain(..) {
3520-
commitment_update.update_fail_htlcs.push(fail_msg);
3521-
}
3522-
commitment_update.update_fail_malformed_htlcs.reserve(update_fail_malformed_htlcs.len());
3523-
for fail_msg in update_fail_malformed_htlcs.drain(..) {
3524-
commitment_update.update_fail_malformed_htlcs.push(fail_msg);
3525-
}
3526-
3511+
match self.free_holding_cell_htlcs(logger) {
3512+
(Some(_), htlcs_to_fail) => {
3513+
let mut additional_update = self.pending_monitor_updates.pop().unwrap();
35273514
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
35283515
// strictly increasing by one, so decrement it here.
35293516
self.latest_monitor_update_id = monitor_update.update_id;
@@ -5843,8 +5830,11 @@ impl<Signer: Sign> Channel<Signer> {
58435830

58445831
/// Begins the shutdown process, getting a message for the remote peer and returning all
58455832
/// holding cell HTLCs for payment failure.
5833+
///
5834+
/// May jump to the channel being fully shutdown (see [`Self::is_shutdown`]) in which case no
5835+
/// [`ChannelMonitorUpdate`] will be returned).
58465836
pub fn get_shutdown<K: Deref>(&mut self, keys_provider: &K, their_features: &InitFeatures, target_feerate_sats_per_kw: Option<u32>)
5847-
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
5837+
-> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
58485838
where K::Target: KeysInterface {
58495839
for htlc in self.pending_outbound_htlcs.iter() {
58505840
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
@@ -5887,12 +5877,15 @@ impl<Signer: Sign> Channel<Signer> {
58875877

58885878
let monitor_update = if update_shutdown_script {
58895879
self.latest_monitor_update_id += 1;
5890-
Some(ChannelMonitorUpdate {
5880+
let monitor_update = ChannelMonitorUpdate {
58915881
update_id: self.latest_monitor_update_id,
58925882
updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
58935883
scriptpubkey: self.get_closing_scriptpubkey(),
58945884
}],
5895-
})
5885+
};
5886+
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
5887+
self.pending_monitor_updates.push(monitor_update);
5888+
Some(self.pending_monitor_updates.last().unwrap())
58965889
} else { None };
58975890
let shutdown = msgs::Shutdown {
58985891
channel_id: self.channel_id,
@@ -5913,6 +5906,9 @@ impl<Signer: Sign> Channel<Signer> {
59135906
}
59145907
});
59155908

5909+
debug_assert!(!self.is_shutdown() || monitor_update.is_none(),
5910+
"we can't both complete shutdown and return a monitor update");
5911+
59165912
Ok((shutdown, monitor_update, dropped_outbound_htlcs))
59175913
}
59185914

lightning/src/ln/channelmanager.rs

Lines changed: 43 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1774,28 +1774,30 @@ where
17741774
}
17751775

17761776
let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
1777-
let peer_state = &mut *peer_state_lock;
1777+
let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
17781778
match peer_state.channel_by_id.entry(channel_id.clone()) {
17791779
hash_map::Entry::Occupied(mut chan_entry) => {
1780-
let (shutdown_msg, monitor_update, htlcs) = chan_entry.get_mut().get_shutdown(&self.keys_manager, &peer_state.latest_features, target_feerate_sats_per_1000_weight)?;
1780+
let funding_txo_opt = chan_entry.get().get_funding_txo();
1781+
let their_features = &peer_state.latest_features;
1782+
let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut()
1783+
.get_shutdown(&self.keys_manager, their_features, target_feerate_sats_per_1000_weight)?;
17811784
failed_htlcs = htlcs;
17821785

1783-
// Update the monitor with the shutdown script if necessary.
1784-
if let Some(monitor_update) = monitor_update {
1785-
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
1786-
let (result, is_permanent) =
1787-
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
1788-
if is_permanent {
1789-
remove_channel!(self, chan_entry);
1790-
break result;
1791-
}
1792-
}
1793-
1786+
// We can send the `shutdown` message before updating the `ChannelMonitor`
1787+
// here as we don't need the monitor update to complete until we send a
1788+
// `shutdown_signed`, which we'll delay if we're pending a monitor update.
17941789
peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
17951790
node_id: *counterparty_node_id,
1796-
msg: shutdown_msg
1791+
msg: shutdown_msg,
17971792
});
17981793

1794+
// Update the monitor with the shutdown script if necessary.
1795+
if let Some(monitor_update) = monitor_update_opt.take() {
1796+
let update_id = monitor_update.update_id;
1797+
let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
1798+
break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
1799+
}
1800+
17991801
if chan_entry.get().is_shutdown() {
18001802
let channel = remove_channel!(self, chan_entry);
18011803
if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
@@ -5060,49 +5062,37 @@ where
50605062
let mut has_monitor_update = false;
50615063
let mut failed_htlcs = Vec::new();
50625064
let mut handle_errors = Vec::new();
5063-
{
5064-
let per_peer_state = self.per_peer_state.read().unwrap();
5065+
let per_peer_state = self.per_peer_state.read().unwrap();
50655066

5066-
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5067+
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5068+
'chan_loop: loop {
50675069
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
5068-
let peer_state = &mut *peer_state_lock;
5069-
let pending_msg_events = &mut peer_state.pending_msg_events;
5070-
peer_state.channel_by_id.retain(|channel_id, chan| {
5071-
match chan.maybe_free_holding_cell_htlcs(&self.logger) {
5072-
Ok((commitment_opt, holding_cell_failed_htlcs)) => {
5073-
if !holding_cell_failed_htlcs.is_empty() {
5074-
failed_htlcs.push((
5075-
holding_cell_failed_htlcs,
5076-
*channel_id,
5077-
chan.get_counterparty_node_id()
5078-
));
5079-
}
5080-
if let Some((commitment_update, monitor_update)) = commitment_opt {
5081-
match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) {
5082-
ChannelMonitorUpdateStatus::Completed => {
5083-
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
5084-
node_id: chan.get_counterparty_node_id(),
5085-
updates: commitment_update,
5086-
});
5087-
},
5088-
e => {
5089-
has_monitor_update = true;
5090-
let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY);
5091-
handle_errors.push((chan.get_counterparty_node_id(), res));
5092-
if close_channel { return false; }
5093-
},
5094-
}
5095-
}
5096-
true
5097-
},
5098-
Err(e) => {
5099-
let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id);
5100-
handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
5101-
// ChannelClosed event is generated by handle_error for us
5102-
!close_channel
5070+
let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
5071+
for (channel_id, chan) in peer_state.channel_by_id.iter_mut() {
5072+
let counterparty_node_id = chan.get_counterparty_node_id();
5073+
let funding_txo = chan.get_funding_txo();
5074+
let (monitor_opt, holding_cell_failed_htlcs) =
5075+
chan.maybe_free_holding_cell_htlcs(&self.logger);
5076+
if !holding_cell_failed_htlcs.is_empty() {
5077+
failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
5078+
}
5079+
if let Some(monitor_update) = monitor_opt {
5080+
has_monitor_update = true;
5081+
5082+
let update_res = self.chain_monitor.update_channel(
5083+
funding_txo.expect("channel is live"), monitor_update);
5084+
let update_id = monitor_update.update_id;
5085+
let channel_id: [u8; 32] = *channel_id;
5086+
let res = handle_new_monitor_update!(self, update_res, update_id,
5087+
peer_state_lock, peer_state, chan, MANUALLY_REMOVING,
5088+
peer_state.channel_by_id.remove(&channel_id));
5089+
if res.is_err() {
5090+
handle_errors.push((counterparty_node_id, res));
51035091
}
5092+
continue 'chan_loop;
51045093
}
5105-
});
5094+
}
5095+
break 'chan_loop;
51065096
}
51075097
}
51085098

0 commit comments

Comments
 (0)