Skip to content

Always process ChannelMonitorUpdates asynchronously #1897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ fn check_api_err(api_err: APIError) {
// all others. If you hit this panic, the list of acceptable errors
// is probably just stale and you should add new messages here.
match err.as_str() {
"Peer for first hop currently disconnected/pending monitor update!" => {},
"Peer for first hop currently disconnected" => {},
_ if err.starts_with("Cannot push more than their max accepted HTLCs ") => {},
_ if err.starts_with("Cannot send value that would put us over the max HTLC value in flight our peer will accept ") => {},
_ if err.starts_with("Cannot send value that would put our balance under counterparty-announced channel reserve value") => {},
Expand Down
20 changes: 17 additions & 3 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ mod tests {
use crate::ln::functional_test_utils::*;
use crate::ln::msgs::ChannelMessageHandler;
use crate::util::errors::APIError;
use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
use crate::util::events::{Event, ClosureReason, MessageSendEvent, MessageSendEventsProvider};

#[test]
fn test_async_ooo_offchain_updates() {
Expand All @@ -819,10 +819,8 @@ mod tests {

nodes[1].node.claim_funds(payment_preimage_1);
check_added_monitors!(nodes[1], 1);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
nodes[1].node.claim_funds(payment_preimage_2);
check_added_monitors!(nodes[1], 1);
expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);

let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
assert_eq!(persistences.len(), 1);
Expand Down Expand Up @@ -850,8 +848,24 @@ mod tests {
.find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();

let claim_events = nodes[1].node.get_and_clear_pending_events();
assert_eq!(claim_events.len(), 2);
match claim_events[0] {
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
assert_eq!(payment_hash_1, *payment_hash);
},
_ => panic!("Unexpected event"),
}
match claim_events[1] {
Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
assert_eq!(payment_hash_2, *payment_hash);
},
_ => panic!("Unexpected event"),
}

// Now manually walk the commitment signed dance - because we claimed two payments
// back-to-back it doesn't fit into the neat walk commitment_signed_dance does.

Expand Down
61 changes: 44 additions & 17 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ fn test_monitor_and_persister_update_fail() {
let mut node_0_per_peer_lock;
let mut node_0_peer_state_lock;
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2);
if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
// Check that even though the persister is returning a InProgress,
// because the update is bogus, ultimately the error that's returned
// should be a PermanentFailure.
Expand Down Expand Up @@ -1602,7 +1602,6 @@ fn test_monitor_update_fail_claim() {

chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
nodes[1].node.claim_funds(payment_preimage_1);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
check_added_monitors!(nodes[1], 1);

Expand All @@ -1628,6 +1627,7 @@ fn test_monitor_update_fail_claim() {
let events = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 0);
commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true);
expect_pending_htlcs_forwardable_ignore!(nodes[1]);

let (_, payment_hash_3, payment_secret_3) = get_payment_preimage_hash!(nodes[0]);
nodes[2].node.send_payment(&route, payment_hash_3, &Some(payment_secret_3), PaymentId(payment_hash_3.0)).unwrap();
Expand All @@ -1645,6 +1645,7 @@ fn test_monitor_update_fail_claim() {
let channel_id = chan_1.2;
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
check_added_monitors!(nodes[1], 0);

let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
Expand All @@ -1653,7 +1654,7 @@ fn test_monitor_update_fail_claim() {
expect_payment_sent!(nodes[0], payment_preimage_1);

// Get the payment forwards, note that they were batched into one commitment update.
expect_pending_htlcs_forwardable!(nodes[1]);
nodes[1].node.process_pending_htlc_forwards();
check_added_monitors!(nodes[1], 1);
let bs_forward_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &bs_forward_update.update_add_htlcs[0]);
Expand Down Expand Up @@ -1739,7 +1740,6 @@ fn test_monitor_update_on_pending_forwards() {
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_2.2 }]);
check_added_monitors!(nodes[1], 1);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());

chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
Expand All @@ -1753,17 +1753,17 @@ fn test_monitor_update_on_pending_forwards() {

let events = nodes[0].node.get_and_clear_pending_events();
assert_eq!(events.len(), 3);
if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[0] {
if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[1] {
assert_eq!(payment_hash, payment_hash_1);
assert!(payment_failed_permanently);
} else { panic!("Unexpected event!"); }
match events[1] {
match events[2] {
Event::PaymentFailed { payment_hash, .. } => {
assert_eq!(payment_hash, payment_hash_1);
},
_ => panic!("Unexpected event"),
}
match events[2] {
match events[0] {
Event::PendingHTLCsForwardable { .. } => { },
_ => panic!("Unexpected event"),
};
Expand Down Expand Up @@ -1803,14 +1803,14 @@ fn monitor_update_claim_fail_no_response() {

chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
nodes[1].node.claim_funds(payment_preimage_1);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
check_added_monitors!(nodes[1], 1);

assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());

chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
check_added_monitors!(nodes[1], 0);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());

Expand Down Expand Up @@ -2290,7 +2290,6 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
nodes[0].node.claim_funds(payment_preimage_0);
check_added_monitors!(nodes[0], 1);
expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);

nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &send.msgs[0]);
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &send.commitment_msg);
Expand Down Expand Up @@ -2353,6 +2352,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
let (funding_txo, mon_id, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone();
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_txo, mon_id);
expect_payment_claimed!(nodes[0], payment_hash_0, 100_000);

// New outbound messages should be generated immediately upon a call to
// get_and_clear_pending_msg_events (but not before).
Expand Down Expand Up @@ -2606,7 +2606,15 @@ fn test_permanent_error_during_sending_shutdown() {
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);

assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok());
check_closed_broadcast!(nodes[0], true);

// We always send the `shutdown` response when initiating a shutdown, even if we immediately
// close the channel thereafter.
let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(msg_events.len(), 3);
if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); }
if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); }
if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); }

check_added_monitors!(nodes[0], 2);
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
}
Expand All @@ -2629,7 +2637,15 @@ fn test_permanent_error_during_handling_shutdown() {
assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok());
let shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id());
nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &shutdown);
check_closed_broadcast!(nodes[1], true);

// We always send the `shutdown` response when receiving a shutdown, even if we immediately
// close the channel thereafter.
let msg_events = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(msg_events.len(), 3);
if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); }
if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); }
if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); }

check_added_monitors!(nodes[1], 2);
check_closed_event!(nodes[1], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
}
Expand All @@ -2651,15 +2667,13 @@ fn double_temp_error() {
// `claim_funds` results in a ChannelMonitorUpdate.
nodes[1].node.claim_funds(payment_preimage_1);
check_added_monitors!(nodes[1], 1);
expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
let (funding_tx, latest_update_1, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();

chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
// Previously, this would've panicked due to a double-call to `Channel::monitor_update_failed`,
// which had some asserts that prevented it from being called twice.
nodes[1].node.claim_funds(payment_preimage_2);
check_added_monitors!(nodes[1], 1);
expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);

let (_, latest_update_2, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
Expand All @@ -2668,11 +2682,24 @@ fn double_temp_error() {
check_added_monitors!(nodes[1], 0);
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_2);

// Complete the first HTLC.
let events = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 1);
// Complete the first HTLC. Note that as a side-effect we handle the monitor update completions
// and get both PaymentClaimed events at once.
let msg_events = nodes[1].node.get_and_clear_pending_msg_events();

let events = nodes[1].node.get_and_clear_pending_events();
assert_eq!(events.len(), 2);
match events[0] {
Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_1),
_ => panic!("Unexpected Event: {:?}", events[0]),
}
match events[1] {
Event::PaymentClaimed { amount_msat: 1_000_000, payment_hash, .. } => assert_eq!(payment_hash, payment_hash_2),
_ => panic!("Unexpected Event: {:?}", events[1]),
}

assert_eq!(msg_events.len(), 1);
let (update_fulfill_1, commitment_signed_b1, node_id) = {
match &events[0] {
match &msg_events[0] {
&MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
assert!(update_add_htlcs.is_empty());
assert_eq!(update_fulfill_htlcs.len(), 1);
Expand Down
Loading