@@ -799,7 +799,13 @@ pub(crate) enum MonitorUpdateCompletionAction {
799
799
/// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
800
800
/// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
801
801
/// event can be generated.
802
- PaymentClaimed { payment_hash: PaymentHash },
802
+ PaymentClaimed {
803
+ payment_hash: PaymentHash,
804
+ /// A pending MPP claim which hasn't yet completed.
805
+ ///
806
+ /// Not written to disk.
807
+ pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>,
808
+ },
803
809
/// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
804
810
/// operation of another channel.
805
811
///
@@ -833,7 +839,10 @@ pub(crate) enum MonitorUpdateCompletionAction {
833
839
}
834
840
835
841
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
836
- (0, PaymentClaimed) => { (0, payment_hash, required) },
842
+ (0, PaymentClaimed) => {
843
+ (0, payment_hash, required),
844
+ (9999999999, pending_mpp_claim, (static_value, None)),
845
+ },
837
846
// Note that FreeOtherChannelImmediately should never be written - we were supposed to free
838
847
// *immediately*. However, for simplicity we implement read/write here.
839
848
(1, FreeOtherChannelImmediately) => {
@@ -6259,13 +6268,44 @@ where
6259
6268
return;
6260
6269
}
6261
6270
if valid_mpp {
6262
- for htlc in sources.drain(..) {
6271
+ let mut pending_claim_ptr_opt = None;
6272
+ let mut source_claim_pairs = Vec::with_capacity(sources.len());
6273
+ if sources.len() > 1 {
6274
+ let mut pending_claims = PendingMPPClaim {
6275
+ channels_without_preimage: Vec::new(),
6276
+ channels_with_preimage: Vec::new(),
6277
+ };
6278
+ for htlc in sources.drain(..) {
6279
+ if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
6280
+ let htlc_id = htlc.prev_hop.htlc_id;
6281
+ let chan_id = htlc.prev_hop.channel_id;
6282
+ let chan_outpoint = htlc.prev_hop.outpoint;
6283
+ pending_claims.channels_without_preimage.push((cp_id, chan_outpoint, chan_id, htlc_id));
6284
+ source_claim_pairs.push((htlc, Some((cp_id, chan_id, htlc_id))));
6285
+ }
6286
+ }
6287
+ pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
6288
+ } else {
6289
+ for htlc in sources.drain(..) {
6290
+ source_claim_pairs.push((htlc, None));
6291
+ }
6292
+ }
6293
+ for (htlc, mpp_claim) in source_claim_pairs.drain(..) {
6294
+ let mut pending_mpp_claim = None;
6295
+ let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|pending_claim| {
6296
+ pending_mpp_claim = mpp_claim.map(|(cp_id, chan_id, htlc_id)|
6297
+ (cp_id, chan_id, htlc_id, PendingMPPClaimPointer(Arc::clone(pending_claim)))
6298
+ );
6299
+ RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
6300
+ pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)),
6301
+ }
6302
+ });
6263
6303
let prev_hop_chan_id = htlc.prev_hop.channel_id;
6264
6304
if let Err((pk, err)) = self.claim_funds_from_hop(
6265
6305
htlc.prev_hop, payment_preimage,
6266
6306
|_, definitely_duplicate| {
6267
6307
debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
6268
- Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } )
6308
+ ( Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr )
6269
6309
}
6270
6310
) {
6271
6311
if let msgs::ErrorAction::IgnoreError = err.err.action {
@@ -6296,7 +6336,7 @@ where
6296
6336
}
6297
6337
}
6298
6338
6299
- fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(&self,
6339
+ fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> ( Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>) >(&self,
6300
6340
prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
6301
6341
-> Result<(), (PublicKey, MsgHandleErrInternal)> {
6302
6342
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
@@ -6335,11 +6375,15 @@ where
6335
6375
6336
6376
match fulfill_res {
6337
6377
UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
6338
- if let Some(action) = completion_action(Some(htlc_value_msat), false) {
6378
+ let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false);
6379
+ if let Some(action) = action_opt {
6339
6380
log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}",
6340
6381
chan_id, action);
6341
6382
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
6342
6383
}
6384
+ if let Some(raa_blocker) = raa_blocker_opt {
6385
+ peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
6386
+ }
6343
6387
if !during_init {
6344
6388
handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
6345
6389
peer_state, per_peer_state, chan);
@@ -6357,11 +6401,16 @@ where
6357
6401
}
6358
6402
}
6359
6403
UpdateFulfillCommitFetch::DuplicateClaim {} => {
6360
- let action = if let Some(action) = completion_action(None, true) {
6404
+ let (action_opt, raa_blocker_opt) = completion_action(None, true);
6405
+ let action = if let Some(action) = action_opt {
6361
6406
action
6362
6407
} else {
6363
6408
return Ok(());
6364
6409
};
6410
+ if let Some(raa_blocker) = raa_blocker_opt {
6411
+ debug_assert!(peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker));
6412
+ }
6413
+
6365
6414
mem::drop(peer_state_lock);
6366
6415
6367
6416
log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
@@ -6448,7 +6497,47 @@ where
6448
6497
// `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
6449
6498
// generally always allowed to be duplicative (and it's specifically noted in
6450
6499
// `PaymentForwarded`).
6451
- self.handle_monitor_update_completion_actions(completion_action(None, false));
6500
+ let (action_opt, raa_blocker_opt) = completion_action(None, false);
6501
+
6502
+ if let Some(raa_blocker) = raa_blocker_opt {
6503
+ let counterparty_node_id = prev_hop.counterparty_node_id.or_else(||
6504
+ // prev_hop.counterparty_node_id is always available for payments received after
6505
+ // LDK 0.0.123, but for those received on 0.0.123 and claimed later, we need to
6506
+ // look up the counterparty in the `action_opt`, if possible.
6507
+ if let Some(action) = &action_opt {
6508
+ if let MonitorUpdateCompletionAction::PaymentClaimed { pending_mpp_claim, .. } = action {
6509
+ if let Some((node_id, _, _, _)) = pending_mpp_claim {
6510
+ Some(*node_id)
6511
+ } else { None }
6512
+ } else { None }
6513
+ } else { None });
6514
+ if let Some(counterparty_node_id) = counterparty_node_id {
6515
+ // TODO: Avoid always blocking the world for the write lock here.
6516
+ let mut per_peer_state = self.per_peer_state.write().unwrap();
6517
+ let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(||
6518
+ Mutex::new(PeerState {
6519
+ channel_by_id: new_hash_map(),
6520
+ inbound_channel_request_by_id: new_hash_map(),
6521
+ latest_features: InitFeatures::empty(),
6522
+ pending_msg_events: Vec::new(),
6523
+ in_flight_monitor_updates: BTreeMap::new(),
6524
+ monitor_update_blocked_actions: BTreeMap::new(),
6525
+ actions_blocking_raa_monitor_updates: BTreeMap::new(),
6526
+ is_connected: false,
6527
+ }));
6528
+ let mut peer_state = peer_state_mutex.lock().unwrap();
6529
+
6530
+ peer_state.actions_blocking_raa_monitor_updates
6531
+ .entry(prev_hop.channel_id)
6532
+ .or_insert_with(Vec::new)
6533
+ .push(raa_blocker);
6534
+ } else {
6535
+ debug_assert!(false,
6536
+ "RAA ChannelMonitorUpdate blockers are only set with PaymentClaimed completion actions, so we should always have a counterparty node id");
6537
+ }
6538
+ }
6539
+
6540
+ self.handle_monitor_update_completion_actions(action_opt);
6452
6541
Ok(())
6453
6542
}
6454
6543
@@ -6548,16 +6637,16 @@ where
6548
6637
}
6549
6638
}), "{:?}", *background_events);
6550
6639
}
6551
- None
6640
+ ( None, None)
6552
6641
} else if definitely_duplicate {
6553
6642
if let Some(other_chan) = chan_to_release {
6554
- Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
6643
+ ( Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
6555
6644
downstream_counterparty_node_id: other_chan.counterparty_node_id,
6556
6645
downstream_funding_outpoint: other_chan.funding_txo,
6557
6646
downstream_channel_id: other_chan.channel_id,
6558
6647
blocking_action: other_chan.blocking_action,
6559
- })
6560
- } else { None }
6648
+ }), None)
6649
+ } else { ( None, None) }
6561
6650
} else {
6562
6651
let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
6563
6652
if let Some(claimed_htlc_value) = htlc_claim_value_msat {
@@ -6566,7 +6655,7 @@ where
6566
6655
} else { None };
6567
6656
debug_assert!(skimmed_fee_msat <= total_fee_earned_msat,
6568
6657
"skimmed_fee_msat must always be included in total_fee_earned_msat");
6569
- Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
6658
+ ( Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
6570
6659
event: events::Event::PaymentForwarded {
6571
6660
prev_channel_id: Some(prev_channel_id),
6572
6661
next_channel_id: Some(next_channel_id),
@@ -6578,7 +6667,7 @@ where
6578
6667
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
6579
6668
},
6580
6669
downstream_counterparty_and_funding_outpoint: chan_to_release,
6581
- })
6670
+ }), None)
6582
6671
}
6583
6672
});
6584
6673
if let Err((pk, err)) = res {
@@ -6599,9 +6688,44 @@ where
6599
6688
debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
6600
6689
debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
6601
6690
6691
+ let mut freed_channels = Vec::new();
6692
+
6602
6693
for action in actions.into_iter() {
6603
6694
match action {
6604
- MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
6695
+ MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => {
6696
+ if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim {
6697
+ let per_peer_state = self.per_peer_state.read().unwrap();
6698
+ per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| {
6699
+ let mut peer_state = peer_state_mutex.lock().unwrap();
6700
+ let blockers_entry = peer_state.actions_blocking_raa_monitor_updates.entry(chan_id);
6701
+ if let btree_map::Entry::Occupied(mut blockers) = blockers_entry {
6702
+ blockers.get_mut().retain(|blocker|
6703
+ if let &RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim } = &blocker {
6704
+ if *pending_claim == claim_ptr {
6705
+ let mut pending_claim_state_lock = pending_claim.0.lock().unwrap();
6706
+ let pending_claim_state = &mut *pending_claim_state_lock;
6707
+ pending_claim_state.channels_without_preimage.retain(|(cp, outp, cid, hid)| {
6708
+ if *cp == counterparty_node_id && *cid == chan_id && *hid == htlc_id {
6709
+ pending_claim_state.channels_with_preimage.push((*cp, *outp, *cid));
6710
+ false
6711
+ } else { true }
6712
+ });
6713
+ if pending_claim_state.channels_without_preimage.is_empty() {
6714
+ for (cp, outp, cid) in pending_claim_state.channels_with_preimage.iter() {
6715
+ freed_channels.push((*cp, *outp, *cid, blocker.clone()));
6716
+ }
6717
+ }
6718
+ !pending_claim_state.channels_without_preimage.is_empty()
6719
+ } else { true }
6720
+ } else { true }
6721
+ );
6722
+ if blockers.get().is_empty() {
6723
+ blockers.remove();
6724
+ }
6725
+ }
6726
+ });
6727
+ }
6728
+
6605
6729
let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
6606
6730
if let Some(ClaimingPayment {
6607
6731
amount_msat,
@@ -6645,6 +6769,10 @@ where
6645
6769
},
6646
6770
}
6647
6771
}
6772
+
6773
+ for (node_id, funding_outpoint, channel_id, blocker) in freed_channels {
6774
+ self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
6775
+ }
6648
6776
}
6649
6777
6650
6778
/// Handles a channel reentering a functional state, either due to reconnect or a monitor
0 commit comments