Skip to content

Commit 84731e4

Browse files
committed
Replay MPP claims via background events using new CM metadata
When we claim an MPP payment, then crash before persisting all the relevant `ChannelMonitor`s, we rely on the payment data being available in the `ChannelManager` on restart to re-claim any parts that haven't yet been claimed. This is fine as long as the `ChannelManager` was persisted before the `PaymentClaimable` event was processed, which is generally the case in our `lightning-background-processor`, but may not be in other cases or in a somewhat rare race. In order to fix this, we need to track where all the MPP parts of a payment are in the `ChannelMonitor`, allowing us to re-claim any missing pieces without reference to any `ChannelManager` data. Further, in order to properly generate a `PaymentClaimed` event against the re-started claim, we have to store various payment metadata with the HTLC list as well. Here we finally implement claiming using the new MPP part list and metadata stored in `ChannelMonitor`s. In doing so, we use much more of the existing HTLC-claiming pipeline in `ChannelManager`, utilizing the on-startup background events flow as well as properly re-applying the RAA-blockers to ensure preimages cannot be lost.
1 parent da6e569 commit 84731e4

File tree

3 files changed

+190
-75
lines changed

3 files changed

+190
-75
lines changed

lightning/src/ln/channel.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1288,7 +1288,7 @@ pub(super) struct ChannelContext<SP: Deref> where SP::Target: SignerProvider {
12881288
// further `send_update_fee` calls, dropping the previous holding cell update entirely.
12891289
holding_cell_update_fee: Option<u32>,
12901290
next_holder_htlc_id: u64,
1291-
next_counterparty_htlc_id: u64,
1291+
pub(super) next_counterparty_htlc_id: u64,
12921292
feerate_per_kw: u32,
12931293

12941294
/// The timestamp set on our latest `channel_update` message for this channel. It is updated

lightning/src/ln/channelmanager.rs

+172-69
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,24 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
11171117
}
11181118
);
11191119

1120+
struct HTLCClaimSource {
1121+
counterparty_node_id: Option<PublicKey>,
1122+
funding_txo: OutPoint,
1123+
channel_id: ChannelId,
1124+
htlc_id: u64,
1125+
}
1126+
1127+
impl From<&MPPClaimHTLCSource> for HTLCClaimSource {
1128+
fn from(o: &MPPClaimHTLCSource) -> HTLCClaimSource {
1129+
HTLCClaimSource {
1130+
counterparty_node_id: Some(o.counterparty_node_id),
1131+
funding_txo: o.funding_txo,
1132+
channel_id: o.channel_id,
1133+
htlc_id: o.htlc_id,
1134+
}
1135+
}
1136+
}
1137+
11201138
#[derive(Clone, Debug, PartialEq, Eq)]
11211139
struct MPPClaimHTLCSource {
11221140
counterparty_node_id: PublicKey,
@@ -6889,6 +6907,27 @@ where
68896907
>(
68906908
&self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
68916909
payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
6910+
) {
6911+
let counterparty_node_id =
6912+
match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6913+
Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6914+
None => None
6915+
};
6916+
6917+
let htlc_source = HTLCClaimSource {
6918+
counterparty_node_id,
6919+
funding_txo: prev_hop.outpoint,
6920+
channel_id: prev_hop.channel_id,
6921+
htlc_id: prev_hop.htlc_id,
6922+
};
6923+
self.claim_mpp_part(htlc_source, payment_preimage, payment_info, completion_action)
6924+
}
6925+
6926+
fn claim_mpp_part<
6927+
ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
6928+
>(
6929+
&self, prev_hop: HTLCClaimSource, payment_preimage: PaymentPreimage,
6930+
payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
68926931
) {
68936932
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
68946933

@@ -6905,12 +6944,8 @@ where
69056944
{
69066945
let per_peer_state = self.per_peer_state.read().unwrap();
69076946
let chan_id = prev_hop.channel_id;
6908-
let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6909-
Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6910-
None => None
6911-
};
69126947

6913-
let peer_state_opt = counterparty_node_id_opt.as_ref().map(
6948+
let peer_state_opt = prev_hop.counterparty_node_id.as_ref().map(
69146949
|counterparty_node_id| per_peer_state.get(counterparty_node_id)
69156950
.map(|peer_mutex| peer_mutex.lock().unwrap())
69166951
).unwrap_or(None);
@@ -6937,7 +6972,7 @@ where
69376972
peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
69386973
}
69396974
if !during_init {
6940-
handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
6975+
handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_lock,
69416976
peer_state, per_peer_state, chan);
69426977
} else {
69436978
// If we're running during init we cannot update a monitor directly -
@@ -6946,7 +6981,7 @@ where
69466981
self.pending_background_events.lock().unwrap().push(
69476982
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
69486983
counterparty_node_id,
6949-
funding_txo: prev_hop.outpoint,
6984+
funding_txo: prev_hop.funding_txo,
69506985
channel_id: prev_hop.channel_id,
69516986
update: monitor_update.clone(),
69526987
});
@@ -7020,7 +7055,7 @@ where
70207055
}
70217056
let preimage_update = ChannelMonitorUpdate {
70227057
update_id: CLOSED_CHANNEL_UPDATE_ID,
7023-
counterparty_node_id: None,
7058+
counterparty_node_id: prev_hop.counterparty_node_id,
70247059
updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
70257060
payment_preimage,
70267061
payment_info,
@@ -7031,7 +7066,7 @@ where
70317066
if !during_init {
70327067
// We update the ChannelMonitor on the backward link, after
70337068
// receiving an `update_fulfill_htlc` from the forward link.
7034-
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
7069+
let update_res = self.chain_monitor.update_channel(prev_hop.funding_txo, &preimage_update);
70357070
if update_res != ChannelMonitorUpdateStatus::Completed {
70367071
// TODO: This needs to be handled somehow - if we receive a monitor update
70377072
// with a preimage we *must* somehow manage to propagate it to the upstream
@@ -7054,7 +7089,7 @@ where
70547089
// complete the monitor update completion action from `completion_action`.
70557090
self.pending_background_events.lock().unwrap().push(
70567091
BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
7057-
prev_hop.outpoint, prev_hop.channel_id, preimage_update,
7092+
prev_hop.funding_txo, prev_hop.channel_id, preimage_update,
70587093
)));
70597094
}
70607095
// Note that we do process the completion action here. This totally could be a
@@ -7305,7 +7340,7 @@ where
73057340
onion_fields,
73067341
payment_id,
73077342
}) = payment {
7308-
self.pending_events.lock().unwrap().push_back((events::Event::PaymentClaimed {
7343+
let event = events::Event::PaymentClaimed {
73097344
payment_hash,
73107345
purpose,
73117346
amount_msat,
@@ -7314,7 +7349,16 @@ where
73147349
sender_intended_total_msat,
73157350
onion_fields,
73167351
payment_id,
7317-
}, None));
7352+
};
7353+
let event_action = (event, None);
7354+
let mut pending_events = self.pending_events.lock().unwrap();
7355+
// If we're replaying a claim on startup we may end up duplicating an event
7356+
// that's already in our queue, so check before we push another one. The
7357+
// `payment_id` should suffice to ensure we never spuriously drop a second
7358+
// event for a duplicate payment.
7359+
if !pending_events.contains(&event_action) {
7360+
pending_events.push_back(event_action);
7361+
}
73187362
}
73197363
},
73207364
MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
@@ -13121,67 +13165,126 @@ where
1312113165
};
1312213166

1312313167
for (_, monitor) in args.channel_monitors.iter() {
13124-
for (payment_hash, (payment_preimage, _)) in monitor.get_stored_preimages() {
13125-
let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13126-
let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13127-
let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13128-
mem::drop(claimable_payments);
13129-
if let Some(payment) = payment {
13130-
log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13131-
let mut claimable_amt_msat = 0;
13132-
let mut receiver_node_id = Some(our_network_pubkey);
13133-
let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13134-
if phantom_shared_secret.is_some() {
13135-
let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13136-
.expect("Failed to get node_id for phantom node recipient");
13137-
receiver_node_id = Some(phantom_pubkey)
13138-
}
13139-
for claimable_htlc in &payment.htlcs {
13140-
claimable_amt_msat += claimable_htlc.value;
13141-
13142-
// Add a holding-cell claim of the payment to the Channel, which should be
13143-
// applied ~immediately on peer reconnection. Because it won't generate a
13144-
// new commitment transaction we can just provide the payment preimage to
13145-
// the corresponding ChannelMonitor and nothing else.
13146-
//
13147-
// We do so directly instead of via the normal ChannelMonitor update
13148-
// procedure as the ChainMonitor hasn't yet been initialized, implying
13149-
// we're not allowed to call it directly yet. Further, we do the update
13150-
// without incrementing the ChannelMonitor update ID as there isn't any
13151-
// reason to.
13152-
// If we were to generate a new ChannelMonitor update ID here and then
13153-
// crash before the user finishes block connect we'd end up force-closing
13154-
// this channel as well. On the flip side, there's no harm in restarting
13155-
// without the new monitor persisted - we'll end up right back here on
13156-
// restart.
13157-
let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13158-
let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13159-
.get(&claimable_htlc.prev_hop.outpoint).cloned();
13160-
if let Some(peer_node_id) = peer_node_id_opt {
13161-
let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13162-
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13163-
let peer_state = &mut *peer_state_lock;
13164-
if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13165-
let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13166-
channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13168+
for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() {
13169+
if !payment_claims.is_empty() {
13170+
for payment_claim in payment_claims {
13171+
if payment_claim.mpp_parts.is_empty() {
13172+
return Err(DecodeError::InvalidValue);
13173+
}
13174+
let pending_claims = PendingMPPClaim {
13175+
channels_without_preimage: payment_claim.mpp_parts.clone(),
13176+
channels_with_preimage: Vec::new(),
13177+
};
13178+
let pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
13179+
13180+
// While it may be duplicative to generate a PaymentClaimed here, trying to
13181+
// figure out if the user definitely saw it before shutdown would require some
13182+
// nontrivial logic and may break as we move away from regularly persisting
13183+
// ChannelManager. Instead, we rely on the users' event handler being
13184+
// idempotent and just blindly generate one no matter what, letting the
13185+
// preimages eventually timing out from ChannelMonitors to prevent us from
13186+
// doing so forever.
13187+
13188+
let claim_found =
13189+
channel_manager.claimable_payments.lock().unwrap().begin_claiming_payment(
13190+
payment_hash, &channel_manager.node_signer, &channel_manager.logger,
13191+
&channel_manager.inbound_payment_id_secret, |_| Ok(()),
13192+
);
13193+
if claim_found.is_err() {
13194+
let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13195+
match claimable_payments.pending_claiming_payments.entry(payment_hash) {
13196+
hash_map::Entry::Occupied(_) => {
13197+
debug_assert!(false, "unreachable");
13198+
return Err(DecodeError::InvalidValue);
13199+
},
13200+
hash_map::Entry::Vacant(entry) => {
13201+
entry.insert(payment_claim.claiming_payment);
13202+
},
1316713203
}
1316813204
}
13169-
if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13170-
previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13205+
13206+
for part in payment_claim.mpp_parts.iter() {
13207+
let pending_mpp_claim = pending_claim_ptr_opt.as_ref().map(|ptr| (
13208+
part.counterparty_node_id, part.channel_id, part.htlc_id,
13209+
PendingMPPClaimPointer(Arc::clone(&ptr))
13210+
));
13211+
let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|ptr|
13212+
RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
13213+
pending_claim: PendingMPPClaimPointer(Arc::clone(&ptr)),
13214+
}
13215+
);
13216+
// Note that we don't need to pass the `payment_info` here - its
13217+
// already (clearly) durably on disk in on `ChannelMonitor` so there's
13218+
// no need to worry about getting it into others.
13219+
channel_manager.claim_mpp_part(
13220+
part.into(), payment_preimage, None,
13221+
|_, _|
13222+
(Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr)
13223+
);
1317113224
}
1317213225
}
13173-
let mut pending_events = channel_manager.pending_events.lock().unwrap();
13174-
let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13175-
pending_events.push_back((events::Event::PaymentClaimed {
13176-
receiver_node_id,
13177-
payment_hash,
13178-
purpose: payment.purpose,
13179-
amount_msat: claimable_amt_msat,
13180-
htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13181-
sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13182-
onion_fields: payment.onion_fields,
13183-
payment_id: Some(payment_id),
13184-
}, None));
13226+
} else {
13227+
let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13228+
let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13229+
let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13230+
mem::drop(claimable_payments);
13231+
if let Some(payment) = payment {
13232+
log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13233+
let mut claimable_amt_msat = 0;
13234+
let mut receiver_node_id = Some(our_network_pubkey);
13235+
let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13236+
if phantom_shared_secret.is_some() {
13237+
let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13238+
.expect("Failed to get node_id for phantom node recipient");
13239+
receiver_node_id = Some(phantom_pubkey)
13240+
}
13241+
for claimable_htlc in &payment.htlcs {
13242+
claimable_amt_msat += claimable_htlc.value;
13243+
13244+
// Add a holding-cell claim of the payment to the Channel, which should be
13245+
// applied ~immediately on peer reconnection. Because it won't generate a
13246+
// new commitment transaction we can just provide the payment preimage to
13247+
// the corresponding ChannelMonitor and nothing else.
13248+
//
13249+
// We do so directly instead of via the normal ChannelMonitor update
13250+
// procedure as the ChainMonitor hasn't yet been initialized, implying
13251+
// we're not allowed to call it directly yet. Further, we do the update
13252+
// without incrementing the ChannelMonitor update ID as there isn't any
13253+
// reason to.
13254+
// If we were to generate a new ChannelMonitor update ID here and then
13255+
// crash before the user finishes block connect we'd end up force-closing
13256+
// this channel as well. On the flip side, there's no harm in restarting
13257+
// without the new monitor persisted - we'll end up right back here on
13258+
// restart.
13259+
let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13260+
let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13261+
.get(&claimable_htlc.prev_hop.outpoint).cloned();
13262+
if let Some(peer_node_id) = peer_node_id_opt {
13263+
let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13264+
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13265+
let peer_state = &mut *peer_state_lock;
13266+
if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13267+
let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13268+
channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13269+
}
13270+
}
13271+
if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13272+
previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13273+
}
13274+
}
13275+
let mut pending_events = channel_manager.pending_events.lock().unwrap();
13276+
let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13277+
pending_events.push_back((events::Event::PaymentClaimed {
13278+
receiver_node_id,
13279+
payment_hash,
13280+
purpose: payment.purpose,
13281+
amount_msat: claimable_amt_msat,
13282+
htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13283+
sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13284+
onion_fields: payment.onion_fields,
13285+
payment_id: Some(payment_id),
13286+
}, None));
13287+
}
1318513288
}
1318613289
}
1318713290
}

lightning/src/ln/reload_tests.rs

+17-5
Original file line numberDiff line numberDiff line change
@@ -878,27 +878,39 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool) {
878878
// Now restart nodes[3].
879879
reload_node!(nodes[3], original_manager, &[&updated_monitor.0, &original_monitor.0], persister, new_chain_monitor, nodes_3_deserialized);
880880

881-
// On startup the preimage should have been copied into the non-persisted monitor:
881+
// Until the startup background events are processed (in `get_and_clear_pending_events`,
882+
// below), the preimage is not copied to the non-persisted monitor...
882883
assert!(get_monitor!(nodes[3], chan_id_persisted).get_stored_preimages().contains_key(&payment_hash));
883-
assert!(get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash));
884+
assert_eq!(
885+
get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash),
886+
persist_both_monitors,
887+
);
884888

885889
nodes[1].node.peer_disconnected(nodes[3].node.get_our_node_id());
886890
nodes[2].node.peer_disconnected(nodes[3].node.get_our_node_id());
887891

888892
// During deserialization, we should have closed one channel and broadcast its latest
889893
// commitment transaction. We should also still have the original PaymentClaimable event we
890-
// never finished processing.
894+
// never finished processing as well as a PaymentClaimed event regenerated when we replayed the
895+
// preimage onto the non-persisted monitor.
891896
let events = nodes[3].node.get_and_clear_pending_events();
892897
assert_eq!(events.len(), if persist_both_monitors { 4 } else { 3 });
893898
if let Event::PaymentClaimable { amount_msat: 15_000_000, .. } = events[0] { } else { panic!(); }
894899
if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[1] { } else { panic!(); }
895900
if persist_both_monitors {
896901
if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[2] { } else { panic!(); }
897-
check_added_monitors(&nodes[3], 2);
902+
if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[3] { } else { panic!(); }
903+
check_added_monitors(&nodes[3], 6);
898904
} else {
899-
check_added_monitors(&nodes[3], 1);
905+
if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[2] { } else { panic!(); }
906+
check_added_monitors(&nodes[3], 3);
900907
}
901908

909+
// Now that we've processed background events, the preimage should have been copied into the
910+
// non-persisted monitor:
911+
assert!(get_monitor!(nodes[3], chan_id_persisted).get_stored_preimages().contains_key(&payment_hash));
912+
assert!(get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash));
913+
902914
// On restart, we should also get a duplicate PaymentClaimed event as we persisted the
903915
// ChannelManager prior to handling the original one.
904916
if let Event::PaymentClaimed { payment_hash: our_payment_hash, amount_msat: 15_000_000, .. } =

0 commit comments

Comments
 (0)