Skip to content

Commit 680ecf9

Browse files
committed
Always process claim ChannelMonitorUpdates asynchronously
We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline for `claim_funds_from_hop`, ie the `update_fulfill_htlc`-generation pipeline.
1 parent a296dac commit 680ecf9

File tree

4 files changed

+89
-89
lines changed

4 files changed

+89
-89
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ mod tests {
796796
use crate::ln::functional_test_utils::*;
797797
use crate::ln::msgs::ChannelMessageHandler;
798798
use crate::util::errors::APIError;
799-
use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
799+
use crate::util::events::{Event, ClosureReason, MessageSendEvent, MessageSendEventsProvider};
800800

801801
#[test]
802802
fn test_async_ooo_offchain_updates() {
@@ -818,10 +818,8 @@ mod tests {
818818

819819
nodes[1].node.claim_funds(payment_preimage_1);
820820
check_added_monitors!(nodes[1], 1);
821-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
822821
nodes[1].node.claim_funds(payment_preimage_2);
823822
check_added_monitors!(nodes[1], 1);
824-
expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
825823

826824
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
827825

@@ -851,8 +849,24 @@ mod tests {
851849
.find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
852850
assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
853851
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
852+
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
854853
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
855854

855+
let claim_events = nodes[1].node.get_and_clear_pending_events();
856+
assert_eq!(claim_events.len(), 2);
857+
match claim_events[0] {
858+
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
859+
assert_eq!(payment_hash_1, *payment_hash);
860+
},
861+
_ => panic!("Unexpected event"),
862+
}
863+
match claim_events[1] {
864+
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
865+
assert_eq!(payment_hash_2, *payment_hash);
866+
},
867+
_ => panic!("Unexpected event"),
868+
}
869+
856870
// Now manually walk the commitment signed dance - because we claimed two payments
857871
// back-to-back it doesn't fit into the neat walk commitment_signed_dance does.
858872

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1590,7 +1590,6 @@ fn test_monitor_update_fail_claim() {
15901590

15911591
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
15921592
nodes[1].node.claim_funds(payment_preimage_1);
1593-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
15941593
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
15951594
check_added_monitors!(nodes[1], 1);
15961595

@@ -1616,6 +1615,7 @@ fn test_monitor_update_fail_claim() {
16161615
let events = nodes[1].node.get_and_clear_pending_msg_events();
16171616
assert_eq!(events.len(), 0);
16181617
commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true);
1618+
expect_pending_htlcs_forwardable_ignore!(nodes[1]);
16191619

16201620
let (_, payment_hash_3, payment_secret_3) = get_payment_preimage_hash!(nodes[0]);
16211621
nodes[2].node.send_payment(&route, payment_hash_3, &Some(payment_secret_3), PaymentId(payment_hash_3.0)).unwrap();
@@ -1633,6 +1633,7 @@ fn test_monitor_update_fail_claim() {
16331633
let channel_id = chan_1.2;
16341634
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
16351635
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
1636+
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
16361637
check_added_monitors!(nodes[1], 0);
16371638

16381639
let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1641,7 +1642,7 @@ fn test_monitor_update_fail_claim() {
16411642
expect_payment_sent!(nodes[0], payment_preimage_1);
16421643

16431644
// Get the payment forwards, note that they were batched into one commitment update.
1644-
expect_pending_htlcs_forwardable!(nodes[1]);
1645+
nodes[1].node.process_pending_htlc_forwards();
16451646
check_added_monitors!(nodes[1], 1);
16461647
let bs_forward_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
16471648
nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &bs_forward_update.update_add_htlcs[0]);
@@ -1784,14 +1785,14 @@ fn monitor_update_claim_fail_no_response() {
17841785

17851786
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
17861787
nodes[1].node.claim_funds(payment_preimage_1);
1787-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
17881788
check_added_monitors!(nodes[1], 1);
17891789

17901790
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
17911791

17921792
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
17931793
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
17941794
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
1795+
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
17951796
check_added_monitors!(nodes[1], 0);
17961797
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
17971798

@@ -2270,7 +2271,6 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
22702271
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
22712272
nodes[0].node.claim_funds(payment_preimage_0);
22722273
check_added_monitors!(nodes[0], 1);
2273-
expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);
22742274

22752275
nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &send.msgs[0]);
22762276
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &send.commitment_msg);
@@ -2333,6 +2333,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
23332333
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
23342334
let (funding_txo, mon_id, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone();
23352335
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_txo, mon_id);
2336+
expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);
23362337

23372338
// New outbound messages should be generated immediately upon a call to
23382339
// get_and_clear_pending_msg_events (but not before).
@@ -2631,15 +2632,13 @@ fn double_temp_error() {
26312632
// `claim_funds` results in a ChannelMonitorUpdate.
26322633
nodes[1].node.claim_funds(payment_preimage_1);
26332634
check_added_monitors!(nodes[1], 1);
2634-
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
26352635
let (funding_tx, latest_update_1, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
26362636

26372637
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
26382638
// Previously, this would've panicked due to a double-call to `Channel::monitor_update_failed`,
26392639
// which had some asserts that prevented it from being called twice.
26402640
nodes[1].node.claim_funds(payment_preimage_2);
26412641
check_added_monitors!(nodes[1], 1);
2642-
expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
26432642
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
26442643

26452644
let (_, latest_update_2, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
@@ -2648,11 +2647,24 @@ fn double_temp_error() {
26482647
check_added_monitors!(nodes[1], 0);
26492648
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_2);
26502649

2651-
// Complete the first HTLC.
2652-
let events = nodes[1].node.get_and_clear_pending_msg_events();
2653-
assert_eq!(events.len(), 1);
2650+
// Complete the first HTLC. Note that as a side-effect we handle the monitor update completions
2651+
// and get both PaymentClaimed events at once.
2652+
let msg_events = nodes[1].node.get_and_clear_pending_msg_events();
2653+
2654+
let events = nodes[1].node.get_and_clear_pending_events();
2655+
assert_eq!(events.len(), 2);
2656+
match events[0] {
2657+
Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_1),
2658+
_ => panic!("Unexpected Event: {:?}", events[0]),
2659+
}
2660+
match events[1] {
2661+
Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_2),
2662+
_ => panic!("Unexpected Event: {:?}", events[1]),
2663+
}
2664+
2665+
assert_eq!(msg_events.len(), 1);
26542666
let (update_fulfill_1, commitment_signed_b1, node_id) = {
2655-
match &events[0] {
2667+
match &msg_events[0] {
26562668
&MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
26572669
assert!(update_add_htlcs.is_empty());
26582670
assert_eq!(update_fulfill_htlcs.len(), 1);

lightning/src/ln/channel.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -392,18 +392,15 @@ enum UpdateFulfillFetch {
392392
}
393393

394394
/// The return type of get_update_fulfill_htlc_and_commit.
395-
pub enum UpdateFulfillCommitFetch {
395+
pub enum UpdateFulfillCommitFetch<'a> {
396396
/// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed
397397
/// it in the holding cell, or re-generated the update_fulfill message after the same claim was
398398
/// previously placed in the holding cell (and has since been removed).
399399
NewClaim {
400400
/// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
401-
monitor_update: ChannelMonitorUpdate,
401+
monitor_update: &'a ChannelMonitorUpdate,
402402
/// The value of the HTLC which was claimed, in msat.
403403
htlc_value_msat: u64,
404-
/// The update_fulfill message and commitment_signed message (if the claim was not placed
405-
/// in the holding cell).
406-
msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)>,
407404
},
408405
/// Indicates the HTLC fulfill is duplicative and already existed either in the holding cell
409406
/// or has been forgotten (presumably previously claimed).
@@ -1931,22 +1928,30 @@ impl<Signer: Sign> Channel<Signer> {
19311928
}
19321929
}
19331930

1934-
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> Result<UpdateFulfillCommitFetch, (ChannelError, ChannelMonitorUpdate)> where L::Target: Logger {
1931+
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
19351932
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
1936-
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(update_fulfill_htlc) } => {
1937-
let (commitment, mut additional_update) = match self.send_commitment_no_status_check(logger) {
1938-
Err(e) => return Err((e, monitor_update)),
1939-
Ok(res) => res
1940-
};
1941-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
1933+
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => {
1934+
let mut additional_update = self.build_commitment_no_status_check(logger);
1935+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
19421936
// strictly increasing by one, so decrement it here.
19431937
self.latest_monitor_update_id = monitor_update.update_id;
19441938
monitor_update.updates.append(&mut additional_update.updates);
1945-
Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: Some((update_fulfill_htlc, commitment)) })
1939+
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
1940+
self.pending_monitor_updates.push(monitor_update);
1941+
UpdateFulfillCommitFetch::NewClaim {
1942+
monitor_update: self.pending_monitor_updates.last().unwrap(),
1943+
htlc_value_msat,
1944+
}
19461945
},
1947-
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } =>
1948-
Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: None }),
1949-
UpdateFulfillFetch::DuplicateClaim {} => Ok(UpdateFulfillCommitFetch::DuplicateClaim {}),
1946+
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => {
1947+
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
1948+
self.pending_monitor_updates.push(monitor_update);
1949+
UpdateFulfillCommitFetch::NewClaim {
1950+
monitor_update: self.pending_monitor_updates.last().unwrap(),
1951+
htlc_value_msat,
1952+
}
1953+
}
1954+
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
19501955
}
19511956
}
19521957

lightning/src/ln/channelmanager.rs

Lines changed: 29 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -4301,64 +4301,34 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
43014301
let channel_state = &mut *channel_state_lock;
43024302
if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(chan_id) {
43034303
let counterparty_node_id = chan.get().get_counterparty_node_id();
4304-
match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
4305-
Ok(msgs_monitor_option) => {
4306-
if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
4307-
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
4308-
ChannelMonitorUpdateStatus::Completed => {},
4309-
e => {
4310-
log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
4311-
"Failed to update channel monitor with preimage {:?}: {:?}",
4312-
payment_preimage, e);
4313-
let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err();
4314-
mem::drop(channel_state_lock);
4315-
self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
4316-
return Err((counterparty_node_id, err));
4317-
}
4318-
}
4319-
if let Some((msg, commitment_signed)) = msgs {
4320-
log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}",
4321-
log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id()));
4322-
channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
4323-
node_id: chan.get().get_counterparty_node_id(),
4324-
updates: msgs::CommitmentUpdate {
4325-
update_add_htlcs: Vec::new(),
4326-
update_fulfill_htlcs: vec![msg],
4327-
update_fail_htlcs: Vec::new(),
4328-
update_fail_malformed_htlcs: Vec::new(),
4329-
update_fee: None,
4330-
commitment_signed,
4331-
}
4332-
});
4333-
}
4334-
mem::drop(channel_state_lock);
4335-
self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
4336-
Ok(())
4337-
} else {
4338-
Ok(())
4339-
}
4340-
},
4341-
Err((e, monitor_update)) => {
4342-
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
4343-
ChannelMonitorUpdateStatus::Completed => {},
4344-
e => {
4345-
// TODO: This needs to be handled somehow - if we receive a monitor update
4346-
// with a preimage we *must* somehow manage to propagate it to the upstream
4347-
// channel, or we must have an ability to receive the same update and try
4348-
// again on restart.
4349-
log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info },
4350-
"Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}",
4351-
payment_preimage, e);
4352-
},
4353-
}
4354-
let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id);
4355-
if drop {
4356-
chan.remove_entry();
4357-
}
4358-
mem::drop(channel_state_lock);
4359-
self.handle_monitor_update_completion_actions(completion_action(None));
4360-
Err((counterparty_node_id, res))
4361-
},
4304+
let push_action = |htlc_value_msat| {
4305+
if let Some(action) = completion_action(htlc_value_msat) {
4306+
let per_peer_state = self.per_peer_state.read().unwrap();
4307+
let mut peer_state = per_peer_state.get(&counterparty_node_id)
4308+
.expect("XXX: This may be reachable today, I believe, but once we move the channel storage to per_peer_state it won't be.")
4309+
.lock().unwrap();
4310+
log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
4311+
log_bytes!(chan_id), action);
4312+
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
4313+
}
4314+
};
4315+
4316+
let monitor_option = chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
4317+
if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = monitor_option {
4318+
push_action(Some(htlc_value_msat));
4319+
let update_id = monitor_update.update_id;
4320+
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update);
4321+
let res = handle_new_monitor_update!(self, update_res, update_id, channel_state_lock, channel_state.pending_msg_events, chan);
4322+
if let Err(e) = res {
4323+
// TODO: This is a *critical* error - we probably updated some other
4324+
// monitors with a preimage. We should retry this monitor udpate over
4325+
// and over again until morale improves.
4326+
log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
4327+
return Err((counterparty_node_id, e));
4328+
}
4329+
Ok(())
4330+
} else {
4331+
Ok(())
43624332
}
43634333
} else {
43644334
let preimage_update = ChannelMonitorUpdate {
@@ -4384,6 +4354,7 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
43844354
// `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
43854355
// generally always allowed to be duplicative (and it's specifically noted in
43864356
// `PaymentForwarded`).
4357+
push_action(None);
43874358
self.handle_monitor_update_completion_actions(completion_action(None));
43884359
Ok(())
43894360
}
@@ -7181,8 +7152,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable for ChannelMana
71817152
// LDK versions prior to 0.0.113 do not know how to read the pending claimed payments
71827153
// map. Thus, if there are no entries we skip writing a TLV for it.
71837154
pending_claiming_payments = None;
7184-
} else {
7185-
debug_assert!(false, "While we have code to serialize pending_claiming_payments, the map should always be empty until a later PR");
71867155
}
71877156

71887157
write_tlv_fields!(writer, {

0 commit comments

Comments
 (0)