Skip to content

Commit 9cc6e08

Browse files
committed
Block monitor updates to ensure preimages are in each MPP part
If we claim an MPP payment and only persist some of the `ChannelMonitorUpdate`s which include the preimage prior to shutting down, we may be in a state where some of our `ChannelMonitor`s have the preimage for a payment while others do not. This, it turns out, is actually mostly safe - on startup `ChanelManager` will detect there's a payment it has as unclaimed but there's a corresponding payment preimage in a `ChannelMonitor` and go claim the other MPP parts. This works so long as the `ChannelManager` has been persisted after the payment has been received but prior to the `PaymentClaimable` event being processed (and the claim itself occurring). This is not always true and certainly not required on our API, but our `lightning-background-processor` today does persist prior to event handling so is generally true subject to some race conditions. In order to address this we need to use copy payment preimages across channels irrespective of the `ChannelManager`'s payment state, but this introduces another wrinkle - if one channel makes substantial progress while other channel(s) are still waiting to get the payment preimage in `ChannelMonitor`(s) while the `ChannelManager` hasn't been persisted after the payment was received, we may end up without the preimage on disk. Here, we address this issue by using the new `RAAMonitorUpdateBlockingAction` variant for this specific case. We block persistence of an RAA `ChannelMonitorUpdate` which may remove the preimage from disk until all channels have had the preimage added to their `ChannelMonitor`. We do this only in-memory (and not on disk) as we can recreate this blocker during the startup re-claim logic. This will enable us to claim MPP parts without using the `ChannelManager`'s payment state in later work.
1 parent 8933a71 commit 9cc6e08

File tree

1 file changed

+145
-19
lines changed

1 file changed

+145
-19
lines changed

lightning/src/ln/channelmanager.rs

+145-19
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,13 @@ pub(crate) enum MonitorUpdateCompletionAction {
799799
/// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
800800
/// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
801801
/// event can be generated.
802-
PaymentClaimed { payment_hash: PaymentHash },
802+
PaymentClaimed {
803+
payment_hash: PaymentHash,
804+
/// A pending MPP claim which hasn't yet completed.
805+
///
806+
/// Not written to disk.
807+
pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>,
808+
},
803809
/// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
804810
/// operation of another channel.
805811
///
@@ -833,7 +839,10 @@ pub(crate) enum MonitorUpdateCompletionAction {
833839
}
834840

835841
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
836-
(0, PaymentClaimed) => { (0, payment_hash, required) },
842+
(0, PaymentClaimed) => {
843+
(0, payment_hash, required),
844+
(9999999999, pending_mpp_claim, (static_value, None)),
845+
},
837846
// Note that FreeOtherChannelImmediately should never be written - we were supposed to free
838847
// *immediately*. However, for simplicity we implement read/write here.
839848
(1, FreeOtherChannelImmediately) => {
@@ -6200,7 +6209,7 @@ where
62006209

62016210
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
62026211

6203-
let mut sources = {
6212+
let sources = {
62046213
let mut claimable_payments = self.claimable_payments.lock().unwrap();
62056214
if let Some(payment) = claimable_payments.claimable_payments.remove(&payment_hash) {
62066215
let mut receiver_node_id = self.our_network_pubkey;
@@ -6295,18 +6304,46 @@ where
62956304
return;
62966305
}
62976306
if valid_mpp {
6298-
for htlc in sources.drain(..) {
6307+
let pending_mpp_claim_ptr_opt = if sources.len() > 1 {
6308+
let channels_without_preimage = sources.iter().filter_map(|htlc| {
6309+
if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
6310+
let prev_hop = &htlc.prev_hop;
6311+
Some((cp_id, prev_hop.outpoint, prev_hop.channel_id, prev_hop.htlc_id))
6312+
} else {
6313+
None
6314+
}
6315+
}).collect();
6316+
Some(Arc::new(Mutex::new(PendingMPPClaim {
6317+
channels_without_preimage,
6318+
channels_with_preimage: Vec::new(),
6319+
})))
6320+
} else {
6321+
None
6322+
};
6323+
for htlc in sources {
6324+
let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().and_then(|pending_mpp_claim|
6325+
if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
6326+
let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim));
6327+
Some((cp_id, htlc.prev_hop.channel_id, htlc.prev_hop.htlc_id, claim_ptr))
6328+
} else {
6329+
None
6330+
}
6331+
);
6332+
let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| {
6333+
RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
6334+
pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)),
6335+
}
6336+
});
62996337
self.claim_funds_from_hop(
63006338
htlc.prev_hop, payment_preimage,
63016339
|_, definitely_duplicate| {
63026340
debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
6303-
Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
6341+
(Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim: this_mpp_claim }), raa_blocker)
63046342
}
63056343
);
63066344
}
6307-
}
6308-
if !valid_mpp {
6309-
for htlc in sources.drain(..) {
6345+
} else {
6346+
for htlc in sources {
63106347
let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
63116348
htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes());
63126349
let source = HTLCSource::PreviousHopData(htlc.prev_hop);
@@ -6324,7 +6361,9 @@ where
63246361
}
63256362
}
63266363

6327-
fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(
6364+
fn claim_funds_from_hop<
6365+
ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
6366+
>(
63286367
&self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
63296368
completion_action: ComplFunc,
63306369
) {
@@ -6364,11 +6403,15 @@ where
63646403

63656404
match fulfill_res {
63666405
UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
6367-
if let Some(action) = completion_action(Some(htlc_value_msat), false) {
6406+
let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false);
6407+
if let Some(action) = action_opt {
63686408
log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}",
63696409
chan_id, action);
63706410
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
63716411
}
6412+
if let Some(raa_blocker) = raa_blocker_opt {
6413+
peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
6414+
}
63726415
if !during_init {
63736416
handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
63746417
peer_state, per_peer_state, chan);
@@ -6386,11 +6429,16 @@ where
63866429
}
63876430
}
63886431
UpdateFulfillCommitFetch::DuplicateClaim {} => {
6389-
let action = if let Some(action) = completion_action(None, true) {
6432+
let (action_opt, raa_blocker_opt) = completion_action(None, true);
6433+
if let Some(raa_blocker) = raa_blocker_opt {
6434+
debug_assert!(peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker));
6435+
}
6436+
let action = if let Some(action) = action_opt {
63906437
action
63916438
} else {
63926439
return;
63936440
};
6441+
63946442
mem::drop(peer_state_lock);
63956443

63966444
log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
@@ -6477,7 +6525,46 @@ where
64776525
// `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
64786526
// generally always allowed to be duplicative (and it's specifically noted in
64796527
// `PaymentForwarded`).
6480-
self.handle_monitor_update_completion_actions(completion_action(None, false));
6528+
let (action_opt, raa_blocker_opt) = completion_action(None, false);
6529+
6530+
if let Some(raa_blocker) = raa_blocker_opt {
6531+
let counterparty_node_id = prev_hop.counterparty_node_id.or_else(||
6532+
// prev_hop.counterparty_node_id is always available for payments received after
6533+
// LDK 0.0.123, but for those received on 0.0.123 and claimed later, we need to
6534+
// look up the counterparty in the `action_opt`, if possible.
6535+
action_opt.as_ref().and_then(|action|
6536+
if let MonitorUpdateCompletionAction::PaymentClaimed { pending_mpp_claim, .. } = action {
6537+
pending_mpp_claim.as_ref().map(|(node_id, _, _, _)| *node_id)
6538+
} else { None }
6539+
)
6540+
);
6541+
if let Some(counterparty_node_id) = counterparty_node_id {
6542+
// TODO: Avoid always blocking the world for the write lock here.
6543+
let mut per_peer_state = self.per_peer_state.write().unwrap();
6544+
let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(||
6545+
Mutex::new(PeerState {
6546+
channel_by_id: new_hash_map(),
6547+
inbound_channel_request_by_id: new_hash_map(),
6548+
latest_features: InitFeatures::empty(),
6549+
pending_msg_events: Vec::new(),
6550+
in_flight_monitor_updates: BTreeMap::new(),
6551+
monitor_update_blocked_actions: BTreeMap::new(),
6552+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
6553+
is_connected: false,
6554+
}));
6555+
let mut peer_state = peer_state_mutex.lock().unwrap();
6556+
6557+
peer_state.actions_blocking_raa_monitor_updates
6558+
.entry(prev_hop.channel_id)
6559+
.or_insert_with(Vec::new)
6560+
.push(raa_blocker);
6561+
} else {
6562+
debug_assert!(false,
6563+
"RAA ChannelMonitorUpdate blockers are only set with PaymentClaimed completion actions, so we should always have a counterparty node id");
6564+
}
6565+
}
6566+
6567+
self.handle_monitor_update_completion_actions(action_opt);
64816568
}
64826569

64836570
fn finalize_claims(&self, sources: Vec<HTLCSource>) {
@@ -6576,16 +6663,16 @@ where
65766663
}
65776664
}), "{:?}", *background_events);
65786665
}
6579-
None
6666+
(None, None)
65806667
} else if definitely_duplicate {
65816668
if let Some(other_chan) = chan_to_release {
6582-
Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
6669+
(Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
65836670
downstream_counterparty_node_id: other_chan.counterparty_node_id,
65846671
downstream_funding_outpoint: other_chan.funding_txo,
65856672
downstream_channel_id: other_chan.channel_id,
65866673
blocking_action: other_chan.blocking_action,
6587-
})
6588-
} else { None }
6674+
}), None)
6675+
} else { (None, None) }
65896676
} else {
65906677
let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
65916678
if let Some(claimed_htlc_value) = htlc_claim_value_msat {
@@ -6594,7 +6681,7 @@ where
65946681
} else { None };
65956682
debug_assert!(skimmed_fee_msat <= total_fee_earned_msat,
65966683
"skimmed_fee_msat must always be included in total_fee_earned_msat");
6597-
Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
6684+
(Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
65986685
event: events::Event::PaymentForwarded {
65996686
prev_channel_id: Some(prev_channel_id),
66006687
next_channel_id: Some(next_channel_id),
@@ -6606,7 +6693,7 @@ where
66066693
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
66076694
},
66086695
downstream_counterparty_and_funding_outpoint: chan_to_release,
6609-
})
6696+
}), None)
66106697
}
66116698
});
66126699
},
@@ -6623,9 +6710,44 @@ where
66236710
debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
66246711
debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
66256712

6713+
let mut freed_channels = Vec::new();
6714+
66266715
for action in actions.into_iter() {
66276716
match action {
6628-
MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
6717+
MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => {
6718+
if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim {
6719+
let per_peer_state = self.per_peer_state.read().unwrap();
6720+
per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| {
6721+
let mut peer_state = peer_state_mutex.lock().unwrap();
6722+
let blockers_entry = peer_state.actions_blocking_raa_monitor_updates.entry(chan_id);
6723+
if let btree_map::Entry::Occupied(mut blockers) = blockers_entry {
6724+
blockers.get_mut().retain(|blocker|
6725+
if let &RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim } = &blocker {
6726+
if *pending_claim == claim_ptr {
6727+
let mut pending_claim_state_lock = pending_claim.0.lock().unwrap();
6728+
let pending_claim_state = &mut *pending_claim_state_lock;
6729+
pending_claim_state.channels_without_preimage.retain(|(cp, outp, cid, hid)| {
6730+
if *cp == counterparty_node_id && *cid == chan_id && *hid == htlc_id {
6731+
pending_claim_state.channels_with_preimage.push((*cp, *outp, *cid));
6732+
false
6733+
} else { true }
6734+
});
6735+
if pending_claim_state.channels_without_preimage.is_empty() {
6736+
for (cp, outp, cid) in pending_claim_state.channels_with_preimage.iter() {
6737+
freed_channels.push((*cp, *outp, *cid, blocker.clone()));
6738+
}
6739+
}
6740+
!pending_claim_state.channels_without_preimage.is_empty()
6741+
} else { true }
6742+
} else { true }
6743+
);
6744+
if blockers.get().is_empty() {
6745+
blockers.remove();
6746+
}
6747+
}
6748+
});
6749+
}
6750+
66296751
let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
66306752
if let Some(ClaimingPayment {
66316753
amount_msat,
@@ -6669,6 +6791,10 @@ where
66696791
},
66706792
}
66716793
}
6794+
6795+
for (node_id, funding_outpoint, channel_id, blocker) in freed_channels {
6796+
self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
6797+
}
66726798
}
66736799

66746800
/// Handles a channel reentering a functional state, either due to reconnect or a monitor

0 commit comments

Comments
 (0)