Skip to content

Commit 60e30b8

Browse files
committed
Replay MPP claims via background events using new CM metadata
When we claim an MPP payment, then crash before persisting all the relevant `ChannelMonitor`s, we rely on the payment data being available in the `ChannelManager` on restart to re-claim any parts that haven't yet been claimed. This is fine as long as the `ChannelManager` was persisted before the `PaymentClaimable` event was processed, which is generally the case in our `lightning-background-processor`, but may not be in other cases or in a somewhat rare race. In order to fix this, we need to track where all the MPP parts of a payment are in the `ChannelMonitor`, allowing us to re-claim any missing pieces without reference to any `ChannelManager` data. Further, in order to properly generate a `PaymentClaimed` event against the re-started claim, we have to store various payment metadata with the HTLC list as well. Here we finally implement claiming using the new MPP part list and metadata stored in `ChannelMonitor`s. In doing so, we use much more of the existing HTLC-claiming pipeline in `ChannelManager`, utilizing the on-startup background events flow as well as properly re-applying the RAA-blockers to ensure preimages cannot be lost.
1 parent 7a60be2 commit 60e30b8

File tree

3 files changed

+195
-75
lines changed

3 files changed

+195
-75
lines changed

lightning/src/ln/channel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1290,7 +1290,7 @@ pub(super) struct ChannelContext<SP: Deref> where SP::Target: SignerProvider {
12901290
// further `send_update_fee` calls, dropping the previous holding cell update entirely.
12911291
holding_cell_update_fee: Option<u32>,
12921292
next_holder_htlc_id: u64,
1293-
next_counterparty_htlc_id: u64,
1293+
pub(super) next_counterparty_htlc_id: u64,
12941294
feerate_per_kw: u32,
12951295

12961296
/// The timestamp set on our latest `channel_update` message for this channel. It is updated

lightning/src/ln/channelmanager.rs

Lines changed: 172 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,6 +1125,24 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
11251125
}
11261126
);
11271127

1128+
struct HTLCClaimSource {
1129+
counterparty_node_id: Option<PublicKey>,
1130+
funding_txo: OutPoint,
1131+
channel_id: ChannelId,
1132+
htlc_id: u64,
1133+
}
1134+
1135+
impl From<&MPPClaimHTLCSource> for HTLCClaimSource {
1136+
fn from(o: &MPPClaimHTLCSource) -> HTLCClaimSource {
1137+
HTLCClaimSource {
1138+
counterparty_node_id: Some(o.counterparty_node_id),
1139+
funding_txo: o.funding_txo,
1140+
channel_id: o.channel_id,
1141+
htlc_id: o.htlc_id,
1142+
}
1143+
}
1144+
}
1145+
11281146
#[derive(Clone, Debug, PartialEq, Eq)]
11291147
struct MPPClaimHTLCSource {
11301148
counterparty_node_id: PublicKey,
@@ -6898,6 +6916,27 @@ where
68986916
>(
68996917
&self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
69006918
payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
6919+
) {
6920+
let counterparty_node_id =
6921+
match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6922+
Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6923+
None => None
6924+
};
6925+
6926+
let htlc_source = HTLCClaimSource {
6927+
counterparty_node_id,
6928+
funding_txo: prev_hop.outpoint,
6929+
channel_id: prev_hop.channel_id,
6930+
htlc_id: prev_hop.htlc_id,
6931+
};
6932+
self.claim_mpp_part(htlc_source, payment_preimage, payment_info, completion_action)
6933+
}
6934+
6935+
fn claim_mpp_part<
6936+
ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
6937+
>(
6938+
&self, prev_hop: HTLCClaimSource, payment_preimage: PaymentPreimage,
6939+
payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
69016940
) {
69026941
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
69036942

@@ -6914,12 +6953,8 @@ where
69146953
{
69156954
let per_peer_state = self.per_peer_state.read().unwrap();
69166955
let chan_id = prev_hop.channel_id;
6917-
let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6918-
Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6919-
None => None
6920-
};
69216956

6922-
let peer_state_opt = counterparty_node_id_opt.as_ref().map(
6957+
let peer_state_opt = prev_hop.counterparty_node_id.as_ref().map(
69236958
|counterparty_node_id| per_peer_state.get(counterparty_node_id)
69246959
.map(|peer_mutex| peer_mutex.lock().unwrap())
69256960
).unwrap_or(None);
@@ -6946,7 +6981,7 @@ where
69466981
peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
69476982
}
69486983
if !during_init {
6949-
handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
6984+
handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_lock,
69506985
peer_state, per_peer_state, chan);
69516986
} else {
69526987
// If we're running during init we cannot update a monitor directly -
@@ -6955,7 +6990,7 @@ where
69556990
self.pending_background_events.lock().unwrap().push(
69566991
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
69576992
counterparty_node_id,
6958-
funding_txo: prev_hop.outpoint,
6993+
funding_txo: prev_hop.funding_txo,
69596994
channel_id: prev_hop.channel_id,
69606995
update: monitor_update.clone(),
69616996
});
@@ -7029,7 +7064,7 @@ where
70297064
}
70307065
let preimage_update = ChannelMonitorUpdate {
70317066
update_id: CLOSED_CHANNEL_UPDATE_ID,
7032-
counterparty_node_id: None,
7067+
counterparty_node_id: prev_hop.counterparty_node_id,
70337068
updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
70347069
payment_preimage,
70357070
payment_info,
@@ -7040,7 +7075,7 @@ where
70407075
if !during_init {
70417076
// We update the ChannelMonitor on the backward link, after
70427077
// receiving an `update_fulfill_htlc` from the forward link.
7043-
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
7078+
let update_res = self.chain_monitor.update_channel(prev_hop.funding_txo, &preimage_update);
70447079
if update_res != ChannelMonitorUpdateStatus::Completed {
70457080
// TODO: This needs to be handled somehow - if we receive a monitor update
70467081
// with a preimage we *must* somehow manage to propagate it to the upstream
@@ -7063,7 +7098,7 @@ where
70637098
// complete the monitor update completion action from `completion_action`.
70647099
self.pending_background_events.lock().unwrap().push(
70657100
BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
7066-
prev_hop.outpoint, prev_hop.channel_id, preimage_update,
7101+
prev_hop.funding_txo, prev_hop.channel_id, preimage_update,
70677102
)));
70687103
}
70697104
// Note that we do process the completion action here. This totally could be a
@@ -7314,7 +7349,7 @@ where
73147349
onion_fields,
73157350
payment_id,
73167351
}) = payment {
7317-
self.pending_events.lock().unwrap().push_back((events::Event::PaymentClaimed {
7352+
let event = events::Event::PaymentClaimed {
73187353
payment_hash,
73197354
purpose,
73207355
amount_msat,
@@ -7323,7 +7358,16 @@ where
73237358
sender_intended_total_msat,
73247359
onion_fields,
73257360
payment_id,
7326-
}, None));
7361+
};
7362+
let event_action = (event, None);
7363+
let mut pending_events = self.pending_events.lock().unwrap();
7364+
// If we're replaying a claim on startup we may end up duplicating an event
7365+
// that's already in our queue, so check before we push another one. The
7366+
// `payment_id` should suffice to ensure we never spuriously drop a second
7367+
// event for a duplicate payment.
7368+
if !pending_events.contains(&event_action) {
7369+
pending_events.push_back(event_action);
7370+
}
73277371
}
73287372
},
73297373
MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
@@ -13132,67 +13176,126 @@ where
1313213176
};
1313313177

1313413178
for (_, monitor) in args.channel_monitors.iter() {
13135-
for (payment_hash, (payment_preimage, _)) in monitor.get_stored_preimages() {
13136-
let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13137-
let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13138-
let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13139-
mem::drop(claimable_payments);
13140-
if let Some(payment) = payment {
13141-
log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13142-
let mut claimable_amt_msat = 0;
13143-
let mut receiver_node_id = Some(our_network_pubkey);
13144-
let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13145-
if phantom_shared_secret.is_some() {
13146-
let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13147-
.expect("Failed to get node_id for phantom node recipient");
13148-
receiver_node_id = Some(phantom_pubkey)
13149-
}
13150-
for claimable_htlc in &payment.htlcs {
13151-
claimable_amt_msat += claimable_htlc.value;
13152-
13153-
// Add a holding-cell claim of the payment to the Channel, which should be
13154-
// applied ~immediately on peer reconnection. Because it won't generate a
13155-
// new commitment transaction we can just provide the payment preimage to
13156-
// the corresponding ChannelMonitor and nothing else.
13157-
//
13158-
// We do so directly instead of via the normal ChannelMonitor update
13159-
// procedure as the ChainMonitor hasn't yet been initialized, implying
13160-
// we're not allowed to call it directly yet. Further, we do the update
13161-
// without incrementing the ChannelMonitor update ID as there isn't any
13162-
// reason to.
13163-
// If we were to generate a new ChannelMonitor update ID here and then
13164-
// crash before the user finishes block connect we'd end up force-closing
13165-
// this channel as well. On the flip side, there's no harm in restarting
13166-
// without the new monitor persisted - we'll end up right back here on
13167-
// restart.
13168-
let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13169-
let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13170-
.get(&claimable_htlc.prev_hop.outpoint).cloned();
13171-
if let Some(peer_node_id) = peer_node_id_opt {
13172-
let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13173-
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13174-
let peer_state = &mut *peer_state_lock;
13175-
if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13176-
let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13177-
channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13179+
for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() {
13180+
if !payment_claims.is_empty() {
13181+
for payment_claim in payment_claims {
13182+
if payment_claim.mpp_parts.is_empty() {
13183+
return Err(DecodeError::InvalidValue);
13184+
}
13185+
let pending_claims = PendingMPPClaim {
13186+
channels_without_preimage: payment_claim.mpp_parts.clone(),
13187+
channels_with_preimage: Vec::new(),
13188+
};
13189+
let pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
13190+
13191+
// While it may be duplicative to generate a PaymentClaimed here, trying to
13192+
// figure out if the user definitely saw it before shutdown would require some
13193+
// nontrivial logic and may break as we move away from regularly persisting
13194+
// ChannelManager. Instead, we rely on the users' event handler being
13195+
// idempotent and just blindly generate one no matter what, letting the
13196+
// preimages eventually timing out from ChannelMonitors to prevent us from
13197+
// doing so forever.
13198+
13199+
let claim_found =
13200+
channel_manager.claimable_payments.lock().unwrap().begin_claiming_payment(
13201+
payment_hash, &channel_manager.node_signer, &channel_manager.logger,
13202+
&channel_manager.inbound_payment_id_secret, |_| Ok(()),
13203+
);
13204+
if claim_found.is_err() {
13205+
let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13206+
match claimable_payments.pending_claiming_payments.entry(payment_hash) {
13207+
hash_map::Entry::Occupied(_) => {
13208+
debug_assert!(false, "Entry was added in begin_claiming_payment");
13209+
return Err(DecodeError::InvalidValue);
13210+
},
13211+
hash_map::Entry::Vacant(entry) => {
13212+
entry.insert(payment_claim.claiming_payment);
13213+
},
1317813214
}
1317913215
}
13180-
if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13181-
previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13216+
13217+
for part in payment_claim.mpp_parts.iter() {
13218+
let pending_mpp_claim = pending_claim_ptr_opt.as_ref().map(|ptr| (
13219+
part.counterparty_node_id, part.channel_id, part.htlc_id,
13220+
PendingMPPClaimPointer(Arc::clone(&ptr))
13221+
));
13222+
let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|ptr|
13223+
RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
13224+
pending_claim: PendingMPPClaimPointer(Arc::clone(&ptr)),
13225+
}
13226+
);
13227+
// Note that we don't need to pass the `payment_info` here - its
13228+
// already (clearly) durably on disk in the `ChannelMonitor` so there's
13229+
// no need to worry about getting it into others.
13230+
channel_manager.claim_mpp_part(
13231+
part.into(), payment_preimage, None,
13232+
|_, _|
13233+
(Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr)
13234+
);
1318213235
}
1318313236
}
13184-
let mut pending_events = channel_manager.pending_events.lock().unwrap();
13185-
let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13186-
pending_events.push_back((events::Event::PaymentClaimed {
13187-
receiver_node_id,
13188-
payment_hash,
13189-
purpose: payment.purpose,
13190-
amount_msat: claimable_amt_msat,
13191-
htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13192-
sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13193-
onion_fields: payment.onion_fields,
13194-
payment_id: Some(payment_id),
13195-
}, None));
13237+
} else {
13238+
let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13239+
let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13240+
let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13241+
mem::drop(claimable_payments);
13242+
if let Some(payment) = payment {
13243+
log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13244+
let mut claimable_amt_msat = 0;
13245+
let mut receiver_node_id = Some(our_network_pubkey);
13246+
let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13247+
if phantom_shared_secret.is_some() {
13248+
let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13249+
.expect("Failed to get node_id for phantom node recipient");
13250+
receiver_node_id = Some(phantom_pubkey)
13251+
}
13252+
for claimable_htlc in &payment.htlcs {
13253+
claimable_amt_msat += claimable_htlc.value;
13254+
13255+
// Add a holding-cell claim of the payment to the Channel, which should be
13256+
// applied ~immediately on peer reconnection. Because it won't generate a
13257+
// new commitment transaction we can just provide the payment preimage to
13258+
// the corresponding ChannelMonitor and nothing else.
13259+
//
13260+
// We do so directly instead of via the normal ChannelMonitor update
13261+
// procedure as the ChainMonitor hasn't yet been initialized, implying
13262+
// we're not allowed to call it directly yet. Further, we do the update
13263+
// without incrementing the ChannelMonitor update ID as there isn't any
13264+
// reason to.
13265+
// If we were to generate a new ChannelMonitor update ID here and then
13266+
// crash before the user finishes block connect we'd end up force-closing
13267+
// this channel as well. On the flip side, there's no harm in restarting
13268+
// without the new monitor persisted - we'll end up right back here on
13269+
// restart.
13270+
let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13271+
let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13272+
.get(&claimable_htlc.prev_hop.outpoint).cloned();
13273+
if let Some(peer_node_id) = peer_node_id_opt {
13274+
let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13275+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13276+
let peer_state = &mut *peer_state_lock;
13277+
if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13278+
let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13279+
channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13280+
}
13281+
}
13282+
if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13283+
previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13284+
}
13285+
}
13286+
let mut pending_events = channel_manager.pending_events.lock().unwrap();
13287+
let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13288+
pending_events.push_back((events::Event::PaymentClaimed {
13289+
receiver_node_id,
13290+
payment_hash,
13291+
purpose: payment.purpose,
13292+
amount_msat: claimable_amt_msat,
13293+
htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13294+
sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13295+
onion_fields: payment.onion_fields,
13296+
payment_id: Some(payment_id),
13297+
}, None));
13298+
}
1319613299
}
1319713300
}
1319813301
}

lightning/src/ln/reload_tests.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -878,27 +878,39 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool) {
878878
// Now restart nodes[3].
879879
reload_node!(nodes[3], original_manager, &[&updated_monitor.0, &original_monitor.0], persister, new_chain_monitor, nodes_3_deserialized);
880880

881-
// On startup the preimage should have been copied into the non-persisted monitor:
881+
// Until the startup background events are processed (in `get_and_clear_pending_events`,
882+
// below), the preimage is not copied to the non-persisted monitor...
882883
assert!(get_monitor!(nodes[3], chan_id_persisted).get_stored_preimages().contains_key(&payment_hash));
883-
assert!(get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash));
884+
assert_eq!(
885+
get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash),
886+
persist_both_monitors,
887+
);
884888

885889
nodes[1].node.peer_disconnected(nodes[3].node.get_our_node_id());
886890
nodes[2].node.peer_disconnected(nodes[3].node.get_our_node_id());
887891

888892
// During deserialization, we should have closed one channel and broadcast its latest
889893
// commitment transaction. We should also still have the original PaymentClaimable event we
890-
// never finished processing.
894+
// never finished processing as well as a PaymentClaimed event regenerated when we replayed the
895+
// preimage onto the non-persisted monitor.
891896
let events = nodes[3].node.get_and_clear_pending_events();
892897
assert_eq!(events.len(), if persist_both_monitors { 4 } else { 3 });
893898
if let Event::PaymentClaimable { amount_msat: 15_000_000, .. } = events[0] { } else { panic!(); }
894899
if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[1] { } else { panic!(); }
895900
if persist_both_monitors {
896901
if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[2] { } else { panic!(); }
897-
check_added_monitors(&nodes[3], 2);
902+
if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[3] { } else { panic!(); }
903+
check_added_monitors(&nodes[3], 6);
898904
} else {
899-
check_added_monitors(&nodes[3], 1);
905+
if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[2] { } else { panic!(); }
906+
check_added_monitors(&nodes[3], 3);
900907
}
901908

909+
// Now that we've processed background events, the preimage should have been copied into the
910+
// non-persisted monitor:
911+
assert!(get_monitor!(nodes[3], chan_id_persisted).get_stored_preimages().contains_key(&payment_hash));
912+
assert!(get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash));
913+
902914
// On restart, we should also get a duplicate PaymentClaimed event as we persisted the
903915
// ChannelManager prior to handling the original one.
904916
if let Event::PaymentClaimed { payment_hash: our_payment_hash, amount_msat: 15_000_000, .. } =
@@ -948,6 +960,11 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool) {
948960
nodes[0].node.handle_update_fulfill_htlc(nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]);
949961
commitment_signed_dance!(nodes[0], nodes[2], cs_updates.commitment_signed, false, true);
950962
expect_payment_sent!(nodes[0], payment_preimage);
963+
964+
// Ensure that the remaining channel is fully operation and not blocked (and that after a
965+
// cycle of commitment updates the payment preimage is ultimately pruned).
966+
send_payment(&nodes[0], &[&nodes[2], &nodes[3]], 100_000);
967+
assert!(!get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash));
951968
}
952969
}
953970

0 commit comments

Comments
 (0)