Skip to content

Fail holding-cell AddHTLCs on Channel deser to match disconnection #754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 139 additions & 2 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ use bitcoin::network::constants::Network;
use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr};
use chain::transaction::OutPoint;
use chain::Watch;
use ln::channelmanager::{RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
use ln::features::InitFeatures;
use ln::msgs;
use ln::msgs::{ChannelMessageHandler, ErrorAction, RoutingMessageHandler};
use routing::router::get_route;
use util::config::UserConfig;
use util::enforcing_trait_impls::EnforcingChannelKeys;
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
use util::errors::APIError;
use util::ser::Readable;
use util::ser::{Readable, ReadableArgs, Writeable};

use bitcoin::hashes::sha256::Hash as Sha256;
use bitcoin::hashes::Hash;
Expand All @@ -35,6 +36,8 @@ use ln::functional_test_utils::*;

use util::test_utils;

use std::collections::HashMap;

// If persister_fail is true, we have the persister return a PermanentFailure
// instead of the higher-level ChainMonitor.
fn do_test_simple_monitor_permanent_update_fail(persister_fail: bool) {
Expand Down Expand Up @@ -1809,6 +1812,140 @@ fn monitor_update_claim_fail_no_response() {
claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_2, 1_000_000);
}

#[test]
fn test_chan_reload_discard_outbound_holding() {
// Test that when we reload a ChannelManager from disk we discard (by failing backwards)
// outbound HTLCs sitting in the holding cell. We currently assert that there are no holding
// cell outbound HTLCs when we reconnect to a peer, so this would otherwise fail a
// debug_assertion, but its also good hygiene - if we are sitting on an HTLC when we reload,
// its reasonable to assume its been a while, and, short of having some criteria based on the
// CLTV value, trying to forward it likely doesn't make sense.
// chanmon_fail_consistency found the debug_assertion failure.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let new_chain_monitor;
let node_state_0;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2;
let logger = test_utils::TestLogger::new();

// Start forwarding a payment, skipping the first RAA so A is in AwaitingRAA
let (payment_preimage_1, payment_hash_1) = get_payment_preimage_hash!(nodes[0]);
{
let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV, &logger).unwrap();
nodes[0].node.send_payment(&route, payment_hash_1, &None).unwrap();
check_added_monitors!(nodes[0], 1);
}

let mut events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 1);
let payment_event = SendEvent::from_event(events.pop().unwrap());
nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &payment_event.commitment_msg);
check_added_monitors!(nodes[1], 1);

let (bs_revoke_and_ack, bs_commitment_signed) = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());

// Now forward a second payment, getting it stuck in A's outbound holding cell.
let (_, payment_hash_2) = get_payment_preimage_hash!(nodes[0]);
{
let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[1].node.get_our_node_id(), None, &Vec::new(), 1000000, TEST_FINAL_CLTV, &logger).unwrap();
nodes[0].node.send_payment(&route, payment_hash_2, &None).unwrap();
check_added_monitors!(nodes[0], 0);
}

let node_state = nodes[0].node.encode();
let mut chain_monitor_state = test_utils::TestVecWriter(Vec::new());
let funding_outpoint = *nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap().iter().next().unwrap().0;
nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap().iter().next().unwrap().1.serialize_for_disk(&mut chain_monitor_state).unwrap();

// Now if we pass the RAA back to A it should free the holding cell outbound HTLC.
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_revoke_and_ack);
check_added_monitors!(nodes[0], 1);
events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 1);
let payment_event = SendEvent::from_event(events.pop().unwrap());
assert_eq!(payment_event.msgs.len(), 1);

// Reload A's ChannelManager/Monitor and make sure the reload generates a PaymentFailed for the
// second payment.
let mut chain_monitor = <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut ::std::io::Cursor::new(chain_monitor_state.0)).unwrap().1;
new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &nodes[0].logger, &node_cfgs[0].fee_estimator, &chanmon_cfgs[0].persister);
nodes[0].chain_monitor = &new_chain_monitor;
node_state_0 = {
let mut channel_monitors = HashMap::new();
channel_monitors.insert(funding_outpoint, &mut chain_monitor);
<(BlockHash, ChannelManager<EnforcingChannelKeys, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>)>::read(&mut ::std::io::Cursor::new(node_state), ChannelManagerReadArgs {
keys_manager: &nodes[0].keys_manager,
fee_estimator: &node_cfgs[0].fee_estimator,
chain_monitor: &nodes[0].chain_monitor,
logger: &nodes[0].logger,
tx_broadcaster: &nodes[0].tx_broadcaster,
default_config: UserConfig::default(),
channel_monitors,
}).unwrap().1
};
nodes[0].node = &node_state_0;
assert!(nodes[0].chain_monitor.watch_channel(funding_outpoint, chain_monitor).is_ok());
check_added_monitors!(nodes[0], 1);

let events = nodes[0].node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match events[0] {
Event::PaymentFailed { ref payment_hash, rejected_by_dest, .. } => {
assert_eq!(*payment_hash, payment_hash_2);
assert!(!rejected_by_dest);
},
_ => panic!("Unexpected event"),
}

nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);

nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });

let node_0_reestablish = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReestablish, nodes[1].node.get_our_node_id());
let node_1_reestablish = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id());

nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &node_1_reestablish);
nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &node_0_reestablish);

assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());

// Make sure nodes[1] rebroadcasts the undelivered messages:
let node_1_msgs = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(node_1_msgs.len(), 2);
match node_1_msgs[0] {
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
assert_eq!(*node_id, nodes[0].node.get_our_node_id());
assert!(*msg == bs_revoke_and_ack);
},
_ => panic!(),
}
match node_1_msgs[1] {
MessageSendEvent::UpdateHTLCs { ref node_id, ref updates } => {
assert_eq!(*node_id, nodes[0].node.get_our_node_id());
assert!(updates.commitment_signed == bs_commitment_signed);
},
_ => panic!(),
}

nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_revoke_and_ack);
check_added_monitors!(nodes[0], 1);
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_commitment_signed);
check_added_monitors!(nodes[0], 1);

nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id()));
check_added_monitors!(nodes[1], 1);
expect_pending_htlcs_forwardable!(nodes[1]);
expect_payment_received!(nodes[1], payment_hash_1, 1_000_000);

claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_1, 1_000_000);
}

// confirm_a_first and restore_b_before_conf are wholly unrelated to earlier bools and
// restore_b_before_conf has no meaning if !confirm_a_first
fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf: bool) {
Expand Down
38 changes: 15 additions & 23 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4064,8 +4064,8 @@ impl Readable for InboundHTLCRemovalReason {

impl<ChanSigner: ChannelKeys + Writeable> Writeable for Channel<ChanSigner> {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
// Note that we write out as if remove_uncommitted_htlcs_and_mark_paused had just been
// called but include holding cell updates (and obviously we don't modify self).
// Note that we write out as if remove_uncommitted_htlcs_and_mark_paused
// had just been called.

writer.write_all(&[SERIALIZATION_VERSION; 1])?;
writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?;
Expand Down Expand Up @@ -4156,13 +4156,10 @@ impl<ChanSigner: ChannelKeys + Writeable> Writeable for Channel<ChanSigner> {
(self.holding_cell_htlc_updates.len() as u64).write(writer)?;
for update in self.holding_cell_htlc_updates.iter() {
match update {
&HTLCUpdateAwaitingACK::AddHTLC { ref amount_msat, ref cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet } => {
&HTLCUpdateAwaitingACK::AddHTLC { ref payment_hash, ref source, .. } => {
0u8.write(writer)?;
amount_msat.write(writer)?;
cltv_expiry.write(writer)?;
payment_hash.write(writer)?;
source.write(writer)?;
onion_routing_packet.write(writer)?;
payment_hash.write(writer)?;
},
&HTLCUpdateAwaitingACK::ClaimHTLC { ref payment_preimage, ref htlc_id } => {
1u8.write(writer)?;
Expand Down Expand Up @@ -4248,7 +4245,7 @@ impl<ChanSigner: ChannelKeys + Writeable> Writeable for Channel<ChanSigner> {
}
}

impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
impl<ChanSigner: ChannelKeys + Readable> Readable for (Channel<ChanSigner>, Vec<(HTLCSource, PaymentHash)>) {
fn read<R : ::std::io::Read>(reader: &mut R) -> Result<Self, DecodeError> {
let _ver: u8 = Readable::read(reader)?;
let min_ver: u8 = Readable::read(reader)?;
Expand Down Expand Up @@ -4312,27 +4309,22 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
});
}

let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)> = Vec::new();
let holding_cell_htlc_update_count: u64 = Readable::read(reader)?;
let mut holding_cell_htlc_updates = Vec::with_capacity(cmp::min(holding_cell_htlc_update_count as usize, OUR_MAX_HTLCS as usize*2));
for _ in 0..holding_cell_htlc_update_count {
holding_cell_htlc_updates.push(match <u8 as Readable>::read(reader)? {
0 => HTLCUpdateAwaitingACK::AddHTLC {
amount_msat: Readable::read(reader)?,
cltv_expiry: Readable::read(reader)?,
payment_hash: Readable::read(reader)?,
source: Readable::read(reader)?,
onion_routing_packet: Readable::read(reader)?,
},
1 => HTLCUpdateAwaitingACK::ClaimHTLC {
match <u8 as Readable>::read(reader)? {
0 => failed_htlcs.push((Readable::read(reader)?, Readable::read(reader)?)),
1 => holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::ClaimHTLC {
payment_preimage: Readable::read(reader)?,
htlc_id: Readable::read(reader)?,
},
2 => HTLCUpdateAwaitingACK::FailHTLC {
}),
2 => holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::FailHTLC {
htlc_id: Readable::read(reader)?,
err_packet: Readable::read(reader)?,
},
}),
_ => return Err(DecodeError::InvalidValue),
});
}
}

let resend_order = match <u8 as Readable>::read(reader)? {
Expand Down Expand Up @@ -4398,7 +4390,7 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
let counterparty_shutdown_scriptpubkey = Readable::read(reader)?;
let commitment_secrets = Readable::read(reader)?;

Ok(Channel {
Ok((Channel {
user_id,

config,
Expand Down Expand Up @@ -4472,7 +4464,7 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
commitment_secrets,

network_sync: UpdateStatus::Fresh,
})
}, failed_htlcs))
}
}

Expand Down
17 changes: 13 additions & 4 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3871,14 +3871,20 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
let latest_block_height: u32 = Readable::read(reader)?;
let last_block_hash: BlockHash = Readable::read(reader)?;

let mut failed_htlcs = Vec::new();
let mut perm_failed_htlcs = Vec::new();
let mut holding_cell_failed_htlcs = Vec::new();

let channel_count: u64 = Readable::read(reader)?;
let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128));
let mut by_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
let mut short_to_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
for _ in 0..channel_count {
let mut channel: Channel<ChanSigner> = Readable::read(reader)?;
let channel_and_failed_htlcs: (Channel<ChanSigner>, Vec<(HTLCSource, PaymentHash)>) = Readable::read(reader)?;
let (mut channel, chan_failed_htlcs) = channel_and_failed_htlcs;
for (_, ref payment_hash) in chan_failed_htlcs.iter() {
log_trace!(args.logger, "Going to fail HTLC with hash {} which was pending-forwarding when we were serialized.", log_bytes!(&payment_hash.0[..]));
}
holding_cell_failed_htlcs.push((channel.channel_id(), chan_failed_htlcs));
if channel.last_block_connected != Default::default() && channel.last_block_connected != last_block_hash {
return Err(DecodeError::InvalidValue);
}
Expand All @@ -3898,7 +3904,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() {
// But if the channel is behind of the monitor, close the channel:
let (_, _, mut new_failed_htlcs) = channel.force_shutdown(true);
failed_htlcs.append(&mut new_failed_htlcs);
perm_failed_htlcs.append(&mut new_failed_htlcs);
monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger);
} else {
if let Some(short_channel_id) = channel.get_short_channel_id() {
Expand Down Expand Up @@ -3993,9 +3999,12 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
default_configuration: args.default_config,
};

for htlc_source in failed_htlcs.drain(..) {
for htlc_source in perm_failed_htlcs.drain(..) {
channel_manager.fail_htlc_backwards_internal(channel_manager.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
}
for (chan_id, htlcs) in holding_cell_failed_htlcs.drain(..) {
channel_manager.fail_holding_cell_htlcs(htlcs, chan_id);
}

//TODO: Broadcast channel update for closed channels, but only after we've made a
//connection or two.
Expand Down