Skip to content

Commit 4369a0f

Browse files
committed
Reload pending payments from ChannelMonitor HTLC data on reload
If we go to send a payment, add the HTLC(s) to the channel(s), commit the ChannelMonitor updates to disk, and then crash, we'll come back up with no pending payments but HTLC(s) ready to be claim/failed. This makes it rather impractical to write a payment sender/retryer, as you cannot guarantee atomicity - you cannot guarantee you'll have retry data persisted even if the HTLC(s) are actually pending. Because ChannelMonitors are *the* atomically-persisted data in LDK, we lean on their current HTLC data to figure out what HTLC(s) are a part of an outbound payment, rebuilding the pending payments list on reload.
1 parent 9a8b01d commit 4369a0f

File tree

3 files changed

+279
-6
lines changed

3 files changed

+279
-6
lines changed

lightning/src/chain/channelmonitor.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,77 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
14751475

14761476
res
14771477
}
1478+
1479+
/// Gets the set of outbound HTLCs which are pending resolution in this channel.
1480+
/// This is used to reconstruct pending outbound payments on restart in the ChannelManager.
1481+
pub(crate) fn get_pending_htlcs(&self) -> HashMap<HTLCSource, HTLCOutputInCommitment> {
1482+
let mut res = HashMap::new();
1483+
let us = self.inner.lock().unwrap();
1484+
1485+
macro_rules! walk_htlcs {
1486+
($holder_commitment: expr, $htlc_iter: expr) => {
1487+
for (htlc, source) in $htlc_iter {
1488+
if us.htlcs_resolved_on_chain.iter().any(|v| Some(v.input_idx) == htlc.transaction_output_index) {
1489+
assert!(us.funding_spend_confirmed.is_some());
1490+
} else if htlc.offered == $holder_commitment {
1491+
// If the payment was outbound, check if there's an HTLCUpdate
1492+
// indicating we have spent this HTLC with a timeout, claiming it back
1493+
// and awaiting confirmations on it.
1494+
let htlc_update_confd = us.onchain_events_awaiting_threshold_conf.iter().find_map(|event| {
1495+
if let OnchainEvent::HTLCUpdate { input_idx: Some(input_idx), .. } = event.event {
1496+
if Some(input_idx) == htlc.transaction_output_index &&
1497+
us.best_block.height() >= event.height + ANTI_REORG_DELAY - 1
1498+
{ Some(()) } else { None }
1499+
} else { None }
1500+
});
1501+
if htlc_update_confd.is_none() {
1502+
res.insert(source.clone(), htlc.clone());
1503+
}
1504+
}
1505+
}
1506+
}
1507+
}
1508+
1509+
if let Some(txid) = us.funding_spend_confirmed {
1510+
if Some(txid) == us.current_counterparty_commitment_txid || Some(txid) == us.prev_counterparty_commitment_txid {
1511+
walk_htlcs!(false, us.counterparty_claimable_outpoints.get(&txid).unwrap().iter().find_map(|(a, b)| {
1512+
if let &Some(ref source) = b {
1513+
Some((a, &**source))
1514+
} else { None }
1515+
}));
1516+
} else if txid == us.current_holder_commitment_tx.txid {
1517+
walk_htlcs!(true, us.current_holder_commitment_tx.htlc_outputs.iter().find_map(|(a, _, c)| {
1518+
if let Some(source) = c { Some((a, source)) } else { None }
1519+
}));
1520+
} else if let Some(prev_commitment) = &us.prev_holder_signed_commitment_tx {
1521+
if txid == prev_commitment.txid {
1522+
walk_htlcs!(true, prev_commitment.htlc_outputs.iter().find_map(|(a, _, c)| {
1523+
if let Some(source) = c { Some((a, source)) } else { None }
1524+
}));
1525+
}
1526+
}
1527+
} else {
1528+
macro_rules! check_htlc_fails {
1529+
($txid: expr, $commitment_tx: expr) => {
1530+
if let Some(ref latest_outpoints) = us.counterparty_claimable_outpoints.get($txid) {
1531+
for &(ref htlc, ref source_option) in latest_outpoints.iter() {
1532+
if let &Some(ref source) = source_option {
1533+
res.insert((**source).clone(), htlc.clone());
1534+
}
1535+
}
1536+
}
1537+
}
1538+
}
1539+
if let Some(ref txid) = us.current_counterparty_commitment_txid {
1540+
check_htlc_fails!(txid, "current");
1541+
}
1542+
if let Some(ref txid) = us.prev_counterparty_commitment_txid {
1543+
check_htlc_fails!(txid, "previous");
1544+
}
1545+
}
1546+
1547+
res
1548+
}
14781549
}
14791550

14801551
/// Compares a broadcasted commitment transaction's HTLCs with those in the latest state,

lightning/src/ln/channelmanager.rs

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ pub(super) enum HTLCForwardInfo {
145145
}
146146

147147
/// Tracks the inbound corresponding to an outbound HTLC
148-
#[derive(Clone, PartialEq)]
148+
#[derive(Clone, Hash, PartialEq, Eq)]
149149
pub(crate) struct HTLCPreviousHopData {
150150
short_channel_id: u64,
151151
htlc_id: u64,
@@ -189,7 +189,7 @@ impl Readable for PaymentId {
189189
}
190190
}
191191
/// Tracks the inbound corresponding to an outbound HTLC
192-
#[derive(Clone, PartialEq)]
192+
#[derive(Clone, PartialEq, Eq)]
193193
pub(crate) enum HTLCSource {
194194
PreviousHopData(HTLCPreviousHopData),
195195
OutboundRoute {
@@ -202,6 +202,23 @@ pub(crate) enum HTLCSource {
202202
payment_secret: Option<PaymentSecret>,
203203
},
204204
}
205+
impl core::hash::Hash for HTLCSource {
206+
fn hash<H: core::hash::Hasher>(&self, hasher: &mut H) {
207+
match self {
208+
HTLCSource::PreviousHopData(prev_hop_data) => {
209+
0u8.hash(hasher);
210+
prev_hop_data.hash(hasher);
211+
},
212+
HTLCSource::OutboundRoute { path, session_priv, payment_id, payment_secret, first_hop_htlc_msat: _ } => {
213+
1u8.hash(hasher);
214+
path.hash(hasher);
215+
session_priv[..].hash(hasher);
216+
payment_id.hash(hasher);
217+
payment_secret.hash(hasher);
218+
},
219+
}
220+
}
221+
}
205222
#[cfg(test)]
206223
impl HTLCSource {
207224
pub fn dummy() -> Self {
@@ -5841,6 +5858,49 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
58415858
outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs });
58425859
}
58435860
pending_outbound_payments = Some(outbounds);
5861+
} else {
5862+
// If we're tracking pending payments, ensure we haven't lost any by looking at the
5863+
// ChannelMonitor data for any channels for which we do not have authorative state
5864+
// (i.e. those for which we just force-closed above or we otherwise don't have a
5865+
// corresponding `Channel` at all).
5866+
// This avoids several edge-cases where we would otherwise "forget" about pending
5867+
// payments which are still in-flight via their on-chain state.
5868+
// We only rebuild the pending payments map if we were most recently serialized by
5869+
// 0.0.102+
5870+
for (_, monitor) in args.channel_monitors {
5871+
if by_id.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() {
5872+
for (htlc_source, htlc) in monitor.get_pending_htlcs() {
5873+
if let HTLCSource::OutboundRoute { payment_id, session_priv, path, payment_secret, .. } = htlc_source {
5874+
if path.is_empty() {
5875+
log_error!(args.logger, "Got an empty path for a pending payment");
5876+
return Err(DecodeError::InvalidValue);
5877+
}
5878+
let path_amt = path.last().unwrap().fee_msat;
5879+
let mut session_priv_bytes = [0; 32];
5880+
session_priv_bytes[..].copy_from_slice(&session_priv[..]);
5881+
match pending_outbound_payments.as_mut().unwrap().entry(payment_id) {
5882+
hash_map::Entry::Occupied(mut entry) => {
5883+
let readded = entry.get_mut().insert(session_priv_bytes, path_amt);
5884+
log_info!(args.logger, "{} a pending payment path for {} msat for session priv {} on an existing pending payment with payment hash {}",
5885+
if readded { "Added" } else { "Had" }, path_amt, log_bytes!(session_priv_bytes), log_bytes!(htlc.payment_hash.0));
5886+
},
5887+
hash_map::Entry::Vacant(entry) => {
5888+
entry.insert(PendingOutboundPayment::Retryable {
5889+
session_privs: [session_priv_bytes].iter().map(|a| *a).collect(),
5890+
payment_hash: htlc.payment_hash,
5891+
payment_secret,
5892+
pending_amt_msat: path_amt,
5893+
total_msat: path_amt,
5894+
starting_block_height: best_block_height,
5895+
});
5896+
log_info!(args.logger, "Added a pending payment for {} msat with payment hash {} for path with session priv {}",
5897+
path_amt, log_bytes!(htlc.payment_hash.0), log_bytes!(session_priv_bytes));
5898+
}
5899+
}
5900+
}
5901+
}
5902+
}
5903+
}
58445904
}
58455905

58465906
let mut secp_ctx = Secp256k1::new();

lightning/src/ln/payment_tests.rs

Lines changed: 146 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,23 @@
1111
//! serialization ordering between ChannelManager/ChannelMonitors and ensuring we can still retry
1212
//! payments thereafter.
1313
14+
use chain::Watch;
15+
use chain::channelmonitor::ChannelMonitor;
1416
use ln::{PaymentPreimage, PaymentHash};
15-
use ln::channelmanager::{PaymentId, PaymentSendFailure};
16-
use routing::router::get_route;
17+
use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, PaymentId, PaymentSendFailure};
1718
use ln::features::{InitFeatures, InvoiceFeatures};
1819
use ln::msgs;
19-
use ln::msgs::ChannelMessageHandler;
20+
use ln::msgs::{ChannelMessageHandler, ErrorAction};
21+
use routing::router::get_route;
2022
use util::test_utils;
21-
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
23+
use util::events::{ClosureReason, Event, MessageSendEvent, MessageSendEventsProvider};
2224
use util::errors::APIError;
25+
use util::enforcing_trait_impls::EnforcingSigner;
26+
use util::ser::{ReadableArgs, Writeable};
2327

2428
use bitcoin::hashes::sha256::Hash as Sha256;
2529
use bitcoin::hashes::Hash;
30+
use bitcoin::BlockHash;
2631

2732
use prelude::*;
2833

@@ -252,3 +257,140 @@ fn retry_on_init_fail() {
252257

253258
assert!(nodes[0].node.pending_outbound_payments.lock().unwrap().is_empty());
254259
}
260+
261+
#[test]
262+
fn retry_with_no_persist() {
263+
// If we send a pending payment and `send_payment` returns success, we should always either
264+
// return a payment failure event or a payment success event, and on failure the payment should
265+
// be retryable.
266+
// In order to do so when the ChannelManager isn't immediately persisted (which is normal - its
267+
// always persisted asynchronously), the ChannelManager has to reload some payment data from
268+
// ChannelMonitor(s) in some cases. This tests that reloading.
269+
let chanmon_cfgs = create_chanmon_cfgs(3);
270+
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
271+
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
272+
let persister: test_utils::TestPersister;
273+
let new_chain_monitor: test_utils::TestChainMonitor;
274+
let nodes_0_deserialized: ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>;
275+
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
276+
277+
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
278+
create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known());
279+
280+
// Serialize the ChannelManager prior to sending the payment
281+
let nodes_0_serialized = nodes[0].node.encode();
282+
283+
let (route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[2], 100_000);
284+
let payment_id = nodes[0].node.send_payment(&route, payment_hash, &Some(payment_secret)).unwrap();
285+
check_added_monitors!(nodes[0], 1);
286+
287+
let mut events = nodes[0].node.get_and_clear_pending_msg_events();
288+
assert_eq!(events.len(), 1);
289+
let payment_event = SendEvent::from_event(events.pop().unwrap());
290+
assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id());
291+
292+
// We relay the payment to nodes[1] while its disconnected from nodes[2], causing the payment
293+
// to be returned immediately to nodes[0], without having nodes[2] fail the inbound payment
294+
// which would prevent retry.
295+
nodes[1].node.peer_disconnected(&nodes[2].node.get_our_node_id(), false);
296+
nodes[2].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
297+
298+
nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
299+
commitment_signed_dance!(nodes[1], nodes[0], payment_event.commitment_msg, false, true);
300+
// nodes[1] now immediately fails the HTLC as the next-hop channel is disconnected
301+
let _ = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
302+
303+
reconnect_nodes(&nodes[1], &nodes[2], (true, true), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
304+
305+
// The ChannelMonitor should always be the latest version, as we're required to persist it
306+
// during the `commitment_signed_dance!()`.
307+
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
308+
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
309+
310+
persister = test_utils::TestPersister::new();
311+
let keys_manager = &chanmon_cfgs[0].keys_manager;
312+
new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), nodes[0].logger, node_cfgs[0].fee_estimator, &persister, keys_manager);
313+
nodes[0].chain_monitor = &new_chain_monitor;
314+
let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
315+
let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
316+
&mut chan_0_monitor_read, keys_manager).unwrap();
317+
assert!(chan_0_monitor_read.is_empty());
318+
319+
let mut nodes_0_read = &nodes_0_serialized[..];
320+
let (_, nodes_0_deserialized_tmp) = {
321+
let mut channel_monitors = HashMap::new();
322+
channel_monitors.insert(chan_0_monitor.get_funding_txo().0, &mut chan_0_monitor);
323+
<(BlockHash, ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs {
324+
default_config: test_default_channel_config(),
325+
keys_manager,
326+
fee_estimator: node_cfgs[0].fee_estimator,
327+
chain_monitor: nodes[0].chain_monitor,
328+
tx_broadcaster: nodes[0].tx_broadcaster.clone(),
329+
logger: nodes[0].logger,
330+
channel_monitors,
331+
}).unwrap()
332+
};
333+
nodes_0_deserialized = nodes_0_deserialized_tmp;
334+
assert!(nodes_0_read.is_empty());
335+
336+
assert!(nodes[0].chain_monitor.watch_channel(chan_0_monitor.get_funding_txo().0, chan_0_monitor).is_ok());
337+
nodes[0].node = &nodes_0_deserialized;
338+
check_added_monitors!(nodes[0], 1);
339+
340+
// On reload, the ChannelManager should realize it is stale compared to the ChannelMonitor and
341+
// force-close the channel.
342+
check_closed_event!(nodes[0], 1, ClosureReason::OutdatedChannelManager);
343+
assert!(nodes[0].node.list_channels().is_empty());
344+
assert!(!nodes[0].node.pending_outbound_payments.lock().unwrap().is_empty());
345+
let as_commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
346+
assert_eq!(as_commitment_tx.len(), 1);
347+
348+
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
349+
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &msgs::Init { features: InitFeatures::known()});
350+
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
351+
352+
// Now nodes[1] should send a channel reestablish, which nodes[0] will respond to with an
353+
// error, as the channel has hit the chain.
354+
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { features: InitFeatures::known()});
355+
let bs_reestablish = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id());
356+
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &bs_reestablish);
357+
let as_err = nodes[0].node.get_and_clear_pending_msg_events();
358+
assert_eq!(as_err.len(), 1);
359+
match as_err[0] {
360+
MessageSendEvent::HandleError { node_id, action: msgs::ErrorAction::SendErrorMessage { ref msg } } => {
361+
assert_eq!(node_id, nodes[1].node.get_our_node_id());
362+
nodes[1].node.handle_error(&nodes[0].node.get_our_node_id(), msg);
363+
check_closed_event!(nodes[1], 1, ClosureReason::CounterpartyForceClosed { peer_msg: "Failed to find corresponding channel".to_string() });
364+
check_added_monitors!(nodes[1], 1);
365+
assert_eq!(nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0).len(), 1);
366+
},
367+
_ => panic!("Unexpected event"),
368+
}
369+
check_closed_broadcast!(nodes[1], false);
370+
371+
// Create a new channel on which to retry the payment before we fail the payment via the
372+
// HTLC-Timeout transaction. This avoids ChannelManager timing out the payment due to us
373+
// connecting several blocks while creating the channel (implying time has passed).
374+
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
375+
assert_eq!(nodes[0].node.list_usable_channels().len(), 1);
376+
377+
mine_transaction(&nodes[0], &as_commitment_tx[0]);
378+
connect_blocks(&nodes[0], TEST_FINAL_CLTV*4 + 20);
379+
let as_htlc_timeout_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
380+
assert_eq!(as_htlc_timeout_tx.len(), 1);
381+
confirm_transaction(&nodes[0], &as_htlc_timeout_tx[0]);
382+
nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().clear();
383+
expect_payment_failed!(nodes[0], payment_hash, false);
384+
385+
// Finally, retry the payment (which was reloaded from the ChannelMonitor when nodes[0] was
386+
// reloaded) via a route over the new channel, which work without issue and eventually be
387+
// received and claimed at the recipient just like any other payment.
388+
let (new_route, _, _, _) = get_route_and_payment_hash!(nodes[0], nodes[2], 100_000);
389+
390+
nodes[0].node.retry_payment(&new_route, payment_id).unwrap();
391+
check_added_monitors!(nodes[0], 1);
392+
let mut events = nodes[0].node.get_and_clear_pending_msg_events();
393+
assert_eq!(events.len(), 1);
394+
pass_along_path(&nodes[0], &[&nodes[1], &nodes[2]], 100_000, payment_hash, Some(payment_secret), events.pop().unwrap(), true, None);
395+
claim_payment_along_route(&nodes[0], &[&[&nodes[1], &nodes[2]]], false, payment_preimage);
396+
}

0 commit comments

Comments
 (0)