Skip to content

Commit 53670ba

Browse files
committed
Always process holding cell ChannelMonitorUpdates asynchronously
We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline when freeing the holding cell, including when initiating shutdown.
1 parent 680ecf9 commit 53670ba

File tree

3 files changed

+71
-80
lines changed

3 files changed

+71
-80
lines changed

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2587,7 +2587,15 @@ fn test_permanent_error_during_sending_shutdown() {
25872587
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
25882588

25892589
assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok());
2590-
check_closed_broadcast!(nodes[0], true);
2590+
2591+
// We always send the `shutdown` response when initiating a shutdown, even if we immediately
2592+
// close the channel thereafter.
2593+
let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
2594+
assert_eq!(msg_events.len(), 3);
2595+
if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); }
2596+
if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); }
2597+
if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); }
2598+
25912599
check_added_monitors!(nodes[0], 2);
25922600
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
25932601
}

lightning/src/ln/channel.rs

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3186,16 +3186,16 @@ impl<Signer: Sign> Channel<Signer> {
31863186
/// Public version of the below, checking relevant preconditions first.
31873187
/// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
31883188
/// returns `(None, Vec::new())`.
3189-
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
3189+
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
31903190
if self.channel_state >= ChannelState::ChannelReady as u32 &&
31913191
(self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
31923192
self.free_holding_cell_htlcs(logger)
3193-
} else { Ok((None, Vec::new())) }
3193+
} else { (None, Vec::new()) }
31943194
}
31953195

31963196
/// Frees any pending commitment updates in the holding cell, generating the relevant messages
31973197
/// for our counterparty.
3198-
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
3198+
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
31993199
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
32003200
if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
32013201
log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.holding_cell_htlc_updates.len(),
@@ -3276,16 +3276,16 @@ impl<Signer: Sign> Channel<Signer> {
32763276
}
32773277
}
32783278
if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
3279-
return Ok((None, htlcs_to_fail));
3279+
return (None, htlcs_to_fail);
32803280
}
32813281
let update_fee = if let Some(feerate) = self.holding_cell_update_fee.take() {
32823282
self.send_update_fee(feerate, false, logger)
32833283
} else {
32843284
None
32853285
};
32863286

3287-
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
3288-
// send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
3287+
let mut additional_update = self.build_commitment_no_status_check(logger);
3288+
// build_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
32893289
// but we want them to be strictly increasing by one, so reset it here.
32903290
self.latest_monitor_update_id = monitor_update.update_id;
32913291
monitor_update.updates.append(&mut additional_update.updates);
@@ -3294,16 +3294,11 @@ impl<Signer: Sign> Channel<Signer> {
32943294
log_bytes!(self.channel_id()), if update_fee.is_some() { "a fee update, " } else { "" },
32953295
update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());
32963296

3297-
Ok((Some((msgs::CommitmentUpdate {
3298-
update_add_htlcs,
3299-
update_fulfill_htlcs,
3300-
update_fail_htlcs,
3301-
update_fail_malformed_htlcs: Vec::new(),
3302-
update_fee,
3303-
commitment_signed,
3304-
}, monitor_update)), htlcs_to_fail))
3297+
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
3298+
self.pending_monitor_updates.push(monitor_update);
3299+
(Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
33053300
} else {
3306-
Ok((None, Vec::new()))
3301+
(None, Vec::new())
33073302
}
33083303
}
33093304

@@ -3513,17 +3508,9 @@ impl<Signer: Sign> Channel<Signer> {
35133508
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
35143509
}
35153510

3516-
match self.free_holding_cell_htlcs(logger)? {
3517-
(Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => {
3518-
commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len());
3519-
for fail_msg in update_fail_htlcs.drain(..) {
3520-
commitment_update.update_fail_htlcs.push(fail_msg);
3521-
}
3522-
commitment_update.update_fail_malformed_htlcs.reserve(update_fail_malformed_htlcs.len());
3523-
for fail_msg in update_fail_malformed_htlcs.drain(..) {
3524-
commitment_update.update_fail_malformed_htlcs.push(fail_msg);
3525-
}
3526-
3511+
match self.free_holding_cell_htlcs(logger) {
3512+
(Some(_), htlcs_to_fail) => {
3513+
let mut additional_update = self.pending_monitor_updates.pop().unwrap();
35273514
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
35283515
// strictly increasing by one, so decrement it here.
35293516
self.latest_monitor_update_id = monitor_update.update_id;
@@ -5843,8 +5830,11 @@ impl<Signer: Sign> Channel<Signer> {
58435830

58445831
/// Begins the shutdown process, getting a message for the remote peer and returning all
58455832
/// holding cell HTLCs for payment failure.
5833+
///
5834+
/// May jump to the channel being fully shutdown (see [`Self::is_shutdown`]) in which case no
5835+
/// [`ChannelMonitorUpdate`] will be returned).
58465836
pub fn get_shutdown<K: Deref>(&mut self, keys_provider: &K, their_features: &InitFeatures, target_feerate_sats_per_kw: Option<u32>)
5847-
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
5837+
-> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
58485838
where K::Target: KeysInterface {
58495839
for htlc in self.pending_outbound_htlcs.iter() {
58505840
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
@@ -5887,12 +5877,15 @@ impl<Signer: Sign> Channel<Signer> {
58875877

58885878
let monitor_update = if update_shutdown_script {
58895879
self.latest_monitor_update_id += 1;
5890-
Some(ChannelMonitorUpdate {
5880+
let monitor_update = ChannelMonitorUpdate {
58915881
update_id: self.latest_monitor_update_id,
58925882
updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
58935883
scriptpubkey: self.get_closing_scriptpubkey(),
58945884
}],
5895-
})
5885+
};
5886+
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
5887+
self.pending_monitor_updates.push(monitor_update);
5888+
Some(self.pending_monitor_updates.last().unwrap())
58965889
} else { None };
58975890
let shutdown = msgs::Shutdown {
58985891
channel_id: self.channel_id,
@@ -5913,6 +5906,9 @@ impl<Signer: Sign> Channel<Signer> {
59135906
}
59145907
});
59155908

5909+
debug_assert!(!self.is_shutdown() || monitor_update.is_none(),
5910+
"we can't both complete shutdown and return a monitor update");
5911+
59165912
Ok((shutdown, monitor_update, dropped_outbound_htlcs))
59175913
}
59185914

lightning/src/ln/channelmanager.rs

Lines changed: 37 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1936,7 +1936,8 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
19361936
if *counterparty_node_id != chan_entry.get().get_counterparty_node_id(){
19371937
return Err(APIError::APIMisuseError { err: "The passed counterparty_node_id doesn't match the channel's counterparty node_id".to_owned() });
19381938
}
1939-
let (shutdown_msg, monitor_update, htlcs) = {
1939+
let funding_txo_opt = chan_entry.get().get_funding_txo();
1940+
let (shutdown_msg, mut monitor_update_opt, htlcs) = {
19401941
let per_peer_state = self.per_peer_state.read().unwrap();
19411942
match per_peer_state.get(&counterparty_node_id) {
19421943
Some(peer_state) => {
@@ -1949,22 +1950,21 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
19491950
};
19501951
failed_htlcs = htlcs;
19511952

1952-
// Update the monitor with the shutdown script if necessary.
1953-
if let Some(monitor_update) = monitor_update {
1954-
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
1955-
let (result, is_permanent) =
1956-
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
1957-
if is_permanent {
1958-
remove_channel!(self, chan_entry);
1959-
break result;
1960-
}
1961-
}
1962-
1953+
// We can send the `shutdown` message before updating the `ChannelMonitor`
1954+
// here as we don't need the monitor update to complete until we send a
1955+
// `shutdown_signed`, which we'll delay if we're pending a monitor update.
19631956
channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
19641957
node_id: *counterparty_node_id,
1965-
msg: shutdown_msg
1958+
msg: shutdown_msg,
19661959
});
19671960

1961+
// Update the monitor with the shutdown script if necessary.
1962+
if let Some(monitor_update) = monitor_update_opt.take() {
1963+
let update_id = monitor_update.update_id;
1964+
let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
1965+
break handle_new_monitor_update!(self, update_res, update_id, channel_state_lock, channel_state.pending_msg_events, chan_entry);
1966+
}
1967+
19681968
if chan_entry.get().is_shutdown() {
19691969
let channel = remove_channel!(self, chan_entry);
19701970
if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
@@ -5479,48 +5479,35 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
54795479
let mut has_monitor_update = false;
54805480
let mut failed_htlcs = Vec::new();
54815481
let mut handle_errors = Vec::new();
5482-
{
5482+
'outer_loop: loop {
54835483
let mut channel_state_lock = self.channel_state.lock().unwrap();
54845484
let channel_state = &mut *channel_state_lock;
5485-
let by_id = &mut channel_state.by_id;
5486-
let pending_msg_events = &mut channel_state.pending_msg_events;
54875485

5488-
by_id.retain(|channel_id, chan| {
5489-
match chan.maybe_free_holding_cell_htlcs(&self.logger) {
5490-
Ok((commitment_opt, holding_cell_failed_htlcs)) => {
5491-
if !holding_cell_failed_htlcs.is_empty() {
5492-
failed_htlcs.push((
5493-
holding_cell_failed_htlcs,
5494-
*channel_id,
5495-
chan.get_counterparty_node_id()
5496-
));
5497-
}
5498-
if let Some((commitment_update, monitor_update)) = commitment_opt {
5499-
match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) {
5500-
ChannelMonitorUpdateStatus::Completed => {
5501-
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
5502-
node_id: chan.get_counterparty_node_id(),
5503-
updates: commitment_update,
5504-
});
5505-
},
5506-
e => {
5507-
has_monitor_update = true;
5508-
let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY);
5509-
handle_errors.push((chan.get_counterparty_node_id(), res));
5510-
if close_channel { return false; }
5511-
},
5512-
}
5513-
}
5514-
true
5515-
},
5516-
Err(e) => {
5517-
let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id);
5518-
handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
5519-
// ChannelClosed event is generated by handle_error for us
5520-
!close_channel
5486+
for (channel_id, chan) in channel_state.by_id.iter_mut() {
5487+
let counterparty_node_id = chan.get_counterparty_node_id();
5488+
let funding_txo = chan.get_funding_txo();
5489+
let (monitor_opt, holding_cell_failed_htlcs) =
5490+
chan.maybe_free_holding_cell_htlcs(&self.logger);
5491+
if !holding_cell_failed_htlcs.is_empty() {
5492+
failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
5493+
}
5494+
if let Some(monitor_update) = monitor_opt {
5495+
has_monitor_update = true;
5496+
5497+
let update_res = self.chain_monitor.update_channel(
5498+
funding_txo.expect("channel is live"), monitor_update);
5499+
let update_id = monitor_update.update_id;
5500+
let channel_id: [u8; 32] = *channel_id;
5501+
let res = handle_new_monitor_update!(self, update_res, update_id,
5502+
channel_state, channel_state.pending_msg_events, chan, MANUALLY_REMOVING,
5503+
channel_state.by_id.remove(&channel_id));
5504+
if res.is_err() {
5505+
handle_errors.push((counterparty_node_id, res));
55215506
}
5507+
continue 'outer_loop;
55225508
}
5523-
});
5509+
}
5510+
break 'outer_loop;
55245511
}
55255512

55265513
let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty();

0 commit comments

Comments
 (0)