Skip to content

Commit 6168a96

Browse files
committed
Always process claim ChannelMonitorUpdates asynchronously
We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline for `claim_funds_from_hop`, ie the `update_fulfill_htlc`-generation pipeline.
1 parent 08233e8 commit 6168a96

File tree

4 files changed

+83
-94
lines changed

4 files changed

+83
-94
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ mod tests {
796796
use crate::ln::functional_test_utils::*;
797797
use crate::ln::msgs::ChannelMessageHandler;
798798
use crate::util::errors::APIError;
799-
use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
799+
use crate::util::events::{Event, ClosureReason, MessageSendEvent, MessageSendEventsProvider};
800800

801801
#[test]
802802
fn test_async_ooo_offchain_updates() {
@@ -819,10 +819,8 @@ mod tests {
819819

820820
nodes[1].node.claim_funds(payment_preimage_1);
821821
check_added_monitors!(nodes[1], 1);
822-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
823822
nodes[1].node.claim_funds(payment_preimage_2);
824823
check_added_monitors!(nodes[1], 1);
825-
expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
826824

827825
let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
828826
assert_eq!(persistences.len(), 1);
@@ -850,8 +848,24 @@ mod tests {
850848
.find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
851849
assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
852850
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
851+
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
853852
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
854853

854+
let claim_events = nodes[1].node.get_and_clear_pending_events();
855+
assert_eq!(claim_events.len(), 2);
856+
match claim_events[0] {
857+
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
858+
assert_eq!(payment_hash_1, *payment_hash);
859+
},
860+
_ => panic!("Unexpected event"),
861+
}
862+
match claim_events[1] {
863+
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
864+
assert_eq!(payment_hash_2, *payment_hash);
865+
},
866+
_ => panic!("Unexpected event"),
867+
}
868+
855869
// Now manually walk the commitment signed dance - because we claimed two payments
856870
// back-to-back it doesn't fit into the neat walk commitment_signed_dance does.
857871

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1602,7 +1602,6 @@ fn test_monitor_update_fail_claim() {
16021602

16031603
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
16041604
nodes[1].node.claim_funds(payment_preimage_1);
1605-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
16061605
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
16071606
check_added_monitors!(nodes[1], 1);
16081607

@@ -1628,6 +1627,7 @@ fn test_monitor_update_fail_claim() {
16281627
let events = nodes[1].node.get_and_clear_pending_msg_events();
16291628
assert_eq!(events.len(), 0);
16301629
commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true);
1630+
expect_pending_htlcs_forwardable_ignore!(nodes[1]);
16311631

16321632
let (_, payment_hash_3, payment_secret_3) = get_payment_preimage_hash!(nodes[0]);
16331633
nodes[2].node.send_payment(&route, payment_hash_3, &Some(payment_secret_3), PaymentId(payment_hash_3.0)).unwrap();
@@ -1645,6 +1645,7 @@ fn test_monitor_update_fail_claim() {
16451645
let channel_id = chan_1.2;
16461646
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
16471647
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
1648+
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
16481649
check_added_monitors!(nodes[1], 0);
16491650

16501651
let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1653,7 +1654,7 @@ fn test_monitor_update_fail_claim() {
16531654
expect_payment_sent!(nodes[0], payment_preimage_1);
16541655

16551656
// Get the payment forwards, note that they were batched into one commitment update.
1656-
expect_pending_htlcs_forwardable!(nodes[1]);
1657+
nodes[1].node.process_pending_htlc_forwards();
16571658
check_added_monitors!(nodes[1], 1);
16581659
let bs_forward_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
16591660
nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &bs_forward_update.update_add_htlcs[0]);
@@ -1796,14 +1797,14 @@ fn monitor_update_claim_fail_no_response() {
17961797

17971798
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
17981799
nodes[1].node.claim_funds(payment_preimage_1);
1799-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
18001800
check_added_monitors!(nodes[1], 1);
18011801

18021802
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
18031803

18041804
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
18051805
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
18061806
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
1807+
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
18071808
check_added_monitors!(nodes[1], 0);
18081809
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
18091810

@@ -2283,7 +2284,6 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
22832284
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
22842285
nodes[0].node.claim_funds(payment_preimage_0);
22852286
check_added_monitors!(nodes[0], 1);
2286-
expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);
22872287

22882288
nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &send.msgs[0]);
22892289
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &send.commitment_msg);
@@ -2346,6 +2346,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
23462346
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
23472347
let (funding_txo, mon_id, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone();
23482348
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_txo, mon_id);
2349+
expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);
23492350

23502351
// New outbound messages should be generated immediately upon a call to
23512352
// get_and_clear_pending_msg_events (but not before).
@@ -2644,15 +2645,13 @@ fn double_temp_error() {
26442645
// `claim_funds` results in a ChannelMonitorUpdate.
26452646
nodes[1].node.claim_funds(payment_preimage_1);
26462647
check_added_monitors!(nodes[1], 1);
2647-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
26482648
let (funding_tx, latest_update_1, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
26492649

26502650
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
26512651
// Previously, this would've panicked due to a double-call to `Channel::monitor_update_failed`,
26522652
// which had some asserts that prevented it from being called twice.
26532653
nodes[1].node.claim_funds(payment_preimage_2);
26542654
check_added_monitors!(nodes[1], 1);
2655-
expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
26562655
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
26572656

26582657
let (_, latest_update_2, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
@@ -2661,11 +2660,24 @@ fn double_temp_error() {
26612660
check_added_monitors!(nodes[1], 0);
26622661
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_2);
26632662

2664-
// Complete the first HTLC.
2665-
let events = nodes[1].node.get_and_clear_pending_msg_events();
2666-
assert_eq!(events.len(), 1);
2663+
// Complete the first HTLC. Note that as a side-effect we handle the monitor update completions
2664+
// and get both PaymentClaimed events at once.
2665+
let msg_events = nodes[1].node.get_and_clear_pending_msg_events();
2666+
2667+
let events = nodes[1].node.get_and_clear_pending_events();
2668+
assert_eq!(events.len(), 2);
2669+
match events[0] {
2670+
Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_1),
2671+
_ => panic!("Unexpected Event: {:?}", events[0]),
2672+
}
2673+
match events[1] {
2674+
Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_2),
2675+
_ => panic!("Unexpected Event: {:?}", events[1]),
2676+
}
2677+
2678+
assert_eq!(msg_events.len(), 1);
26672679
let (update_fulfill_1, commitment_signed_b1, node_id) = {
2668-
match &events[0] {
2680+
match &msg_events[0] {
26692681
&MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
26702682
assert!(update_add_htlcs.is_empty());
26712683
assert_eq!(update_fulfill_htlcs.len(), 1);

lightning/src/ln/channel.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -392,18 +392,15 @@ enum UpdateFulfillFetch {
392392
}
393393

394394
/// The return type of get_update_fulfill_htlc_and_commit.
395-
pub enum UpdateFulfillCommitFetch {
395+
pub enum UpdateFulfillCommitFetch<'a> {
396396
/// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed
397397
/// it in the holding cell, or re-generated the update_fulfill message after the same claim was
398398
/// previously placed in the holding cell (and has since been removed).
399399
NewClaim {
400400
/// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
401-
monitor_update: ChannelMonitorUpdate,
401+
monitor_update: &'a ChannelMonitorUpdate,
402402
/// The value of the HTLC which was claimed, in msat.
403403
htlc_value_msat: u64,
404-
/// The update_fulfill message and commitment_signed message (if the claim was not placed
405-
/// in the holding cell).
406-
msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)>,
407404
},
408405
/// Indicates the HTLC fulfill is duplicative and already existed either in the holding cell
409406
/// or has been forgotten (presumably previously claimed).
@@ -1933,22 +1930,30 @@ impl<Signer: Sign> Channel<Signer> {
19331930
}
19341931
}
19351932

1936-
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> Result<UpdateFulfillCommitFetch, (ChannelError, ChannelMonitorUpdate)> where L::Target: Logger {
1933+
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
19371934
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
1938-
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(update_fulfill_htlc) } => {
1939-
let (commitment, mut additional_update) = match self.send_commitment_no_status_check(logger) {
1940-
Err(e) => return Err((e, monitor_update)),
1941-
Ok(res) => res
1942-
};
1943-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
1935+
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => {
1936+
let mut additional_update = self.build_commitment_no_status_check(logger);
1937+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
19441938
// strictly increasing by one, so decrement it here.
19451939
self.latest_monitor_update_id = monitor_update.update_id;
19461940
monitor_update.updates.append(&mut additional_update.updates);
1947-
Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: Some((update_fulfill_htlc, commitment)) })
1941+
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
1942+
self.pending_monitor_updates.push(monitor_update);
1943+
UpdateFulfillCommitFetch::NewClaim {
1944+
monitor_update: self.pending_monitor_updates.last().unwrap(),
1945+
htlc_value_msat,
1946+
}
19481947
},
1949-
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } =>
1950-
Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: None }),
1951-
UpdateFulfillFetch::DuplicateClaim {} => Ok(UpdateFulfillCommitFetch::DuplicateClaim {}),
1948+
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => {
1949+
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
1950+
self.pending_monitor_updates.push(monitor_update);
1951+
UpdateFulfillCommitFetch::NewClaim {
1952+
monitor_update: self.pending_monitor_updates.last().unwrap(),
1953+
htlc_value_msat,
1954+
}
1955+
}
1956+
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
19521957
}
19531958
}
19541959

lightning/src/ln/channelmanager.rs

Lines changed: 23 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -3874,7 +3874,6 @@ where
38743874
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
38753875

38763876
let chan_id = prev_hop.outpoint.to_channel_id();
3877-
38783877
let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
38793878
Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
38803879
None => None
@@ -3888,70 +3887,31 @@ where
38883887
} else { (false, None) };
38893888

38903889
if found_channel {
3891-
let peer_state = &mut *peer_state_opt.as_mut().unwrap();
3890+
let peer_state: &mut PeerState<_> = &mut *peer_state_opt.as_mut().unwrap();
38923891
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) {
38933892
let counterparty_node_id = chan.get().get_counterparty_node_id();
3894-
match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
3895-
Ok(msgs_monitor_option) => {
3896-
if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
3897-
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
3898-
ChannelMonitorUpdateStatus::Completed => {},
3899-
e => {
3900-
log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
3901-
"Failed to update channel monitor with preimage {:?}: {:?}",
3902-
payment_preimage, e);
3903-
let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err();
3904-
mem::drop(peer_state_opt);
3905-
mem::drop(per_peer_state_lock);
3906-
self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
3907-
return Err((counterparty_node_id, err));
3908-
}
3909-
}
3910-
if let Some((msg, commitment_signed)) = msgs {
3911-
log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}",
3912-
log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id()));
3913-
peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
3914-
node_id: counterparty_node_id,
3915-
updates: msgs::CommitmentUpdate {
3916-
update_add_htlcs: Vec::new(),
3917-
update_fulfill_htlcs: vec![msg],
3918-
update_fail_htlcs: Vec::new(),
3919-
update_fail_malformed_htlcs: Vec::new(),
3920-
update_fee: None,
3921-
commitment_signed,
3922-
}
3923-
});
3924-
}
3925-
mem::drop(peer_state_opt);
3926-
mem::drop(per_peer_state_lock);
3927-
self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
3928-
Ok(())
3929-
} else {
3930-
Ok(())
3931-
}
3932-
},
3933-
Err((e, monitor_update)) => {
3934-
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
3935-
ChannelMonitorUpdateStatus::Completed => {},
3936-
e => {
3937-
// TODO: This needs to be handled somehow - if we receive a monitor update
3938-
// with a preimage we *must* somehow manage to propagate it to the upstream
3939-
// channel, or we must have an ability to receive the same update and try
3940-
// again on restart.
3941-
log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info },
3942-
"Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}",
3943-
payment_preimage, e);
3944-
},
3945-
}
3946-
let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id);
3947-
if drop {
3948-
chan.remove_entry();
3949-
}
3950-
mem::drop(peer_state_opt);
3951-
mem::drop(per_peer_state_lock);
3952-
self.handle_monitor_update_completion_actions(completion_action(None));
3953-
Err((counterparty_node_id, res))
3954-
},
3893+
3894+
let monitor_option = chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
3895+
if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = monitor_option {
3896+
if let Some(action) = completion_action(Some(htlc_value_msat)) {
3897+
log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
3898+
log_bytes!(chan_id), action);
3899+
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
3900+
}
3901+
let update_id = monitor_update.update_id;
3902+
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update);
3903+
let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_opt,
3904+
peer_state, chan);
3905+
if let Err(e) = res {
3906+
// TODO: This is a *critical* error - we probably updated some other
3907+
// monitors with a preimage. We should retry this monitor udpate over
3908+
// and over again until morale improves.
3909+
log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
3910+
return Err((counterparty_node_id, e));
3911+
}
3912+
Ok(())
3913+
} else {
3914+
Ok(())
39553915
}
39563916
} else {
39573917
// We've held the peer_state mutex since finding the channel and setting
@@ -6887,8 +6847,6 @@ where
68876847
// LDK versions prior to 0.0.113 do not know how to read the pending claimed payments
68886848
// map. Thus, if there are no entries we skip writing a TLV for it.
68896849
pending_claiming_payments = None;
6890-
} else {
6891-
debug_assert!(false, "While we have code to serialize pending_claiming_payments, the map should always be empty until a later PR");
68926850
}
68936851

68946852
write_tlv_fields!(writer, {

0 commit comments

Comments
 (0)