Skip to content

Commit e612b72

Browse files
committed
Block monitor updates to ensure preimages are in each MPP part
If we claim an MPP payment and only persist some of the `ChannelMonitorUpdate`s which include the preimage prior to shutting down, we may be in a state where some of our `ChannelMonitor`s have the preimage for a payment while others do not. This, it turns out, is actually mostly safe - on startup `ChanelManager` will detect there's a payment it has as unclaimed but there's a corresponding payment preimage in a `ChannelMonitor` and go claim the other MPP parts. This works so long as the `ChannelManager` has been persisted after the payment has been received but prior to the `PaymentClaimable` event being processed (and the claim itself occurring). This is not always true and certainly not required on our API, but our `lightning-background-processor` today does persist prior to event handling so is generally true subject to some race conditions. In order to address this we need to use copy payment preimages across channels irrespective of the `ChannelManager`'s payment state, but this introduces another wrinkle - if one channel makes substantial progress while other channel(s) are still waiting to get the payment preimage in `ChannelMonitor`(s) while the `ChannelManager` hasn't been persisted after the payment was received, we may end up without the preimage on disk. Here, we address this issue by using the new `RAAMonitorUpdateBlockingAction` variant for this specific case. We block persistence of an RAA `ChannelMonitorUpdate` which may remove the preimage from disk until all channels have had the preimage added to their `ChannelMonitor`. We do this only in-memory (and not on disk) as we can recreate this blocker during the startup re-claim logic. This will enable us to claim MPP parts without using the `ChannelManager`'s payment state in later work.
1 parent cd00f7e commit e612b72

File tree

1 file changed

+143
-15
lines changed

1 file changed

+143
-15
lines changed

lightning/src/ln/channelmanager.rs

+143-15
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,13 @@ pub(crate) enum MonitorUpdateCompletionAction {
799799
/// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
800800
/// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
801801
/// event can be generated.
802-
PaymentClaimed { payment_hash: PaymentHash },
802+
PaymentClaimed {
803+
payment_hash: PaymentHash,
804+
/// A pending MPP claim which hasn't yet completed.
805+
///
806+
/// Not written to disk.
807+
pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>,
808+
},
803809
/// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
804810
/// operation of another channel.
805811
///
@@ -833,7 +839,10 @@ pub(crate) enum MonitorUpdateCompletionAction {
833839
}
834840

835841
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
836-
(0, PaymentClaimed) => { (0, payment_hash, required) },
842+
(0, PaymentClaimed) => {
843+
(0, payment_hash, required),
844+
(9999999999, pending_mpp_claim, (static_value, None)),
845+
},
837846
// Note that FreeOtherChannelImmediately should never be written - we were supposed to free
838847
// *immediately*. However, for simplicity we implement read/write here.
839848
(1, FreeOtherChannelImmediately) => {
@@ -6214,13 +6223,44 @@ where
62146223
return;
62156224
}
62166225
if valid_mpp {
6217-
for htlc in sources.drain(..) {
6226+
let mut pending_claim_ptr_opt = None;
6227+
let mut source_claim_pairs = Vec::with_capacity(sources.len());
6228+
if sources.len() > 1 {
6229+
let mut pending_claims = PendingMPPClaim {
6230+
channels_without_preimage: Vec::new(),
6231+
channels_with_preimage: Vec::new(),
6232+
};
6233+
for htlc in sources.drain(..) {
6234+
if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
6235+
let htlc_id = htlc.prev_hop.htlc_id;
6236+
let chan_id = htlc.prev_hop.channel_id;
6237+
let chan_outpoint = htlc.prev_hop.outpoint;
6238+
pending_claims.channels_without_preimage.push((cp_id, chan_outpoint, chan_id, htlc_id));
6239+
source_claim_pairs.push((htlc, Some((cp_id, chan_id, htlc_id))));
6240+
}
6241+
}
6242+
pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
6243+
} else {
6244+
for htlc in sources.drain(..) {
6245+
source_claim_pairs.push((htlc, None));
6246+
}
6247+
}
6248+
for (htlc, mpp_claim) in source_claim_pairs.drain(..) {
6249+
let mut pending_mpp_claim = None;
6250+
let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|pending_claim| {
6251+
pending_mpp_claim = mpp_claim.map(|(cp_id, chan_id, htlc_id)|
6252+
(cp_id, chan_id, htlc_id, PendingMPPClaimPointer(Arc::clone(pending_claim)))
6253+
);
6254+
RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
6255+
pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)),
6256+
}
6257+
});
62186258
let prev_hop_chan_id = htlc.prev_hop.channel_id;
62196259
if let Err((pk, err)) = self.claim_funds_from_hop(
62206260
htlc.prev_hop, payment_preimage,
62216261
|_, definitely_duplicate| {
62226262
debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
6223-
Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
6263+
(Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr)
62246264
}
62256265
) {
62266266
if let msgs::ErrorAction::IgnoreError = err.err.action {
@@ -6251,7 +6291,7 @@ where
62516291
}
62526292
}
62536293

6254-
fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(&self,
6294+
fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)>(&self,
62556295
prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
62566296
-> Result<(), (PublicKey, MsgHandleErrInternal)> {
62576297
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
@@ -6290,11 +6330,15 @@ where
62906330

62916331
match fulfill_res {
62926332
UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
6293-
if let Some(action) = completion_action(Some(htlc_value_msat), false) {
6333+
let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false);
6334+
if let Some(action) = action_opt {
62946335
log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}",
62956336
chan_id, action);
62966337
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
62976338
}
6339+
if let Some(raa_blocker) = raa_blocker_opt {
6340+
peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
6341+
}
62986342
if !during_init {
62996343
handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
63006344
peer_state, per_peer_state, chan);
@@ -6312,11 +6356,16 @@ where
63126356
}
63136357
}
63146358
UpdateFulfillCommitFetch::DuplicateClaim {} => {
6315-
let action = if let Some(action) = completion_action(None, true) {
6359+
let (action_opt, raa_blocker_opt) = completion_action(None, true);
6360+
let action = if let Some(action) = action_opt {
63166361
action
63176362
} else {
63186363
return Ok(());
63196364
};
6365+
if let Some(raa_blocker) = raa_blocker_opt {
6366+
debug_assert!(peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker));
6367+
}
6368+
63206369
mem::drop(peer_state_lock);
63216370

63226371
log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
@@ -6403,7 +6452,47 @@ where
64036452
// `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
64046453
// generally always allowed to be duplicative (and it's specifically noted in
64056454
// `PaymentForwarded`).
6406-
self.handle_monitor_update_completion_actions(completion_action(None, false));
6455+
let (action_opt, raa_blocker_opt) = completion_action(None, false);
6456+
6457+
if let Some(raa_blocker) = raa_blocker_opt {
6458+
let counterparty_node_id = prev_hop.counterparty_node_id.or_else(||
6459+
// prev_hop.counterparty_node_id is always available for payments received after
6460+
// LDK 0.0.123, but for those received on 0.0.123 and claimed later, we need to
6461+
// look up the counterparty in the `action_opt`, if possible.
6462+
if let Some(action) = &action_opt {
6463+
if let MonitorUpdateCompletionAction::PaymentClaimed { pending_mpp_claim, .. } = action {
6464+
if let Some((node_id, _, _, _)) = pending_mpp_claim {
6465+
Some(*node_id)
6466+
} else { None }
6467+
} else { None }
6468+
} else { None });
6469+
if let Some(counterparty_node_id) = counterparty_node_id {
6470+
// TODO: Avoid always blocking the world for the write lock here.
6471+
let mut per_peer_state = self.per_peer_state.write().unwrap();
6472+
let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(||
6473+
Mutex::new(PeerState {
6474+
channel_by_id: new_hash_map(),
6475+
inbound_channel_request_by_id: new_hash_map(),
6476+
latest_features: InitFeatures::empty(),
6477+
pending_msg_events: Vec::new(),
6478+
in_flight_monitor_updates: BTreeMap::new(),
6479+
monitor_update_blocked_actions: BTreeMap::new(),
6480+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
6481+
is_connected: false,
6482+
}));
6483+
let mut peer_state = peer_state_mutex.lock().unwrap();
6484+
6485+
peer_state.actions_blocking_raa_monitor_updates
6486+
.entry(prev_hop.channel_id)
6487+
.or_insert_with(Vec::new)
6488+
.push(raa_blocker);
6489+
} else {
6490+
debug_assert!(false,
6491+
"RAA ChannelMonitorUpdate blockers are only set with PaymentClaimed completion actions, so we should always have a counterparty node id");
6492+
}
6493+
}
6494+
6495+
self.handle_monitor_update_completion_actions(action_opt);
64076496
Ok(())
64086497
}
64096498

@@ -6503,16 +6592,16 @@ where
65036592
}
65046593
}), "{:?}", *background_events);
65056594
}
6506-
None
6595+
(None, None)
65076596
} else if definitely_duplicate {
65086597
if let Some(other_chan) = chan_to_release {
6509-
Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
6598+
(Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
65106599
downstream_counterparty_node_id: other_chan.counterparty_node_id,
65116600
downstream_funding_outpoint: other_chan.funding_txo,
65126601
downstream_channel_id: other_chan.channel_id,
65136602
blocking_action: other_chan.blocking_action,
6514-
})
6515-
} else { None }
6603+
}), None)
6604+
} else { (None, None) }
65166605
} else {
65176606
let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
65186607
if let Some(claimed_htlc_value) = htlc_claim_value_msat {
@@ -6521,7 +6610,7 @@ where
65216610
} else { None };
65226611
debug_assert!(skimmed_fee_msat <= total_fee_earned_msat,
65236612
"skimmed_fee_msat must always be included in total_fee_earned_msat");
6524-
Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
6613+
(Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
65256614
event: events::Event::PaymentForwarded {
65266615
prev_channel_id: Some(prev_channel_id),
65276616
next_channel_id: Some(next_channel_id),
@@ -6533,7 +6622,7 @@ where
65336622
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
65346623
},
65356624
downstream_counterparty_and_funding_outpoint: chan_to_release,
6536-
})
6625+
}), None)
65376626
}
65386627
});
65396628
if let Err((pk, err)) = res {
@@ -6554,9 +6643,44 @@ where
65546643
debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
65556644
debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
65566645

6646+
let mut freed_channels = Vec::new();
6647+
65576648
for action in actions.into_iter() {
65586649
match action {
6559-
MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
6650+
MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => {
6651+
if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim {
6652+
let per_peer_state = self.per_peer_state.read().unwrap();
6653+
per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| {
6654+
let mut peer_state = peer_state_mutex.lock().unwrap();
6655+
let blockers_entry = peer_state.actions_blocking_raa_monitor_updates.entry(chan_id);
6656+
if let btree_map::Entry::Occupied(mut blockers) = blockers_entry {
6657+
blockers.get_mut().retain(|blocker|
6658+
if let &RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim } = &blocker {
6659+
if *pending_claim == claim_ptr {
6660+
let mut pending_claim_state_lock = pending_claim.0.lock().unwrap();
6661+
let pending_claim_state = &mut *pending_claim_state_lock;
6662+
pending_claim_state.channels_without_preimage.retain(|(cp, outp, cid, hid)| {
6663+
if *cp == counterparty_node_id && *cid == chan_id && *hid == htlc_id {
6664+
pending_claim_state.channels_with_preimage.push((*cp, *outp, *cid));
6665+
false
6666+
} else { true }
6667+
});
6668+
if pending_claim_state.channels_without_preimage.is_empty() {
6669+
for (cp, outp, cid) in pending_claim_state.channels_with_preimage.iter() {
6670+
freed_channels.push((*cp, *outp, *cid, blocker.clone()));
6671+
}
6672+
}
6673+
!pending_claim_state.channels_without_preimage.is_empty()
6674+
} else { true }
6675+
} else { true }
6676+
);
6677+
if blockers.get().is_empty() {
6678+
blockers.remove();
6679+
}
6680+
}
6681+
});
6682+
}
6683+
65606684
let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
65616685
if let Some(ClaimingPayment {
65626686
amount_msat,
@@ -6600,6 +6724,10 @@ where
66006724
},
66016725
}
66026726
}
6727+
6728+
for (node_id, funding_outpoint, channel_id, blocker) in freed_channels {
6729+
self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
6730+
}
66036731
}
66046732

66056733
/// Handles a channel reentering a functional state, either due to reconnect or a monitor

0 commit comments

Comments
 (0)