@@ -563,6 +563,7 @@ struct ClaimablePayments {
563
563
/// usually because we're running pre-full-init. They are handled immediately once we detect we are
564
564
/// running normally, and specifically must be processed before any other non-background
565
565
/// [`ChannelMonitorUpdate`]s are applied.
566
+ #[derive(Debug)]
566
567
enum BackgroundEvent {
567
568
/// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
568
569
/// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
@@ -5381,8 +5382,11 @@ where
5381
5382
for htlc in sources.drain(..) {
5382
5383
if let Err((pk, err)) = self.claim_funds_from_hop(
5383
5384
htlc.prev_hop, payment_preimage,
5384
- |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }))
5385
- {
5385
+ |_, definitely_duplicate| {
5386
+ debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
5387
+ Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
5388
+ }
5389
+ ) {
5386
5390
if let msgs::ErrorAction::IgnoreError = err.err.action {
5387
5391
// We got a temporary failure updating monitor, but will claim the
5388
5392
// HTLC when the monitor updating is restored (or on chain).
@@ -5410,7 +5414,7 @@ where
5410
5414
}
5411
5415
}
5412
5416
5413
- fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>) -> Option<MonitorUpdateCompletionAction>>(&self,
5417
+ fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool ) -> Option<MonitorUpdateCompletionAction>>(&self,
5414
5418
prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
5415
5419
-> Result<(), (PublicKey, MsgHandleErrInternal)> {
5416
5420
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
@@ -5420,6 +5424,11 @@ where
5420
5424
// `BackgroundEvent`s.
5421
5425
let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
5422
5426
5427
+ // As we may call handle_monitor_update_completion_actions in rather rare cases, check that
5428
+ // the required mutexes are not held before we start.
5429
+ debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
5430
+ debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
5431
+
5423
5432
{
5424
5433
let per_peer_state = self.per_peer_state.read().unwrap();
5425
5434
let chan_id = prev_hop.outpoint.to_channel_id();
@@ -5441,25 +5450,70 @@ where
5441
5450
let counterparty_node_id = chan.context.get_counterparty_node_id();
5442
5451
let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
5443
5452
5444
- if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res {
5445
- if let Some(action) = completion_action(Some(htlc_value_msat)) {
5446
- log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
5447
- chan_id, action);
5448
- peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
5453
+ match fulfill_res {
5454
+ UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
5455
+ if let Some(action) = completion_action(Some(htlc_value_msat), false) {
5456
+ log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
5457
+ chan_id, action);
5458
+ peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
5459
+ }
5460
+ if !during_init {
5461
+ handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
5462
+ peer_state, per_peer_state, chan);
5463
+ } else {
5464
+ // If we're running during init we cannot update a monitor directly -
5465
+ // they probably haven't actually been loaded yet. Instead, push the
5466
+ // monitor update as a background event.
5467
+ self.pending_background_events.lock().unwrap().push(
5468
+ BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5469
+ counterparty_node_id,
5470
+ funding_txo: prev_hop.outpoint,
5471
+ update: monitor_update.clone(),
5472
+ });
5473
+ }
5449
5474
}
5450
- if !during_init {
5451
- handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
5452
- peer_state, per_peer_state, chan);
5453
- } else {
5454
- // If we're running during init we cannot update a monitor directly -
5455
- // they probably haven't actually been loaded yet. Instead, push the
5456
- // monitor update as a background event.
5457
- self.pending_background_events.lock().unwrap().push(
5458
- BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5459
- counterparty_node_id,
5460
- funding_txo: prev_hop.outpoint,
5461
- update: monitor_update.clone(),
5462
- });
5475
+ UpdateFulfillCommitFetch::DuplicateClaim {} => {
5476
+ let action = if let Some(action) = completion_action(None, true) {
5477
+ action
5478
+ } else {
5479
+ return Ok(());
5480
+ };
5481
+ mem::drop(peer_state_lock);
5482
+
5483
+ log_trace!(self.logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
5484
+ chan_id, action);
5485
+ let (node_id, funding_outpoint, blocker) =
5486
+ if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
5487
+ downstream_counterparty_node_id: node_id,
5488
+ downstream_funding_outpoint: funding_outpoint,
5489
+ blocking_action: blocker,
5490
+ } = action {
5491
+ (node_id, funding_outpoint, blocker)
5492
+ } else {
5493
+ debug_assert!(false,
5494
+ "Duplicate claims should always free another channel immediately");
5495
+ return Ok(());
5496
+ };
5497
+ if let Some(peer_state_mtx) = per_peer_state.get(&node_id) {
5498
+ let mut peer_state = peer_state_mtx.lock().unwrap();
5499
+ if let Some(blockers) = peer_state
5500
+ .actions_blocking_raa_monitor_updates
5501
+ .get_mut(&funding_outpoint.to_channel_id())
5502
+ {
5503
+ let mut found_blocker = false;
5504
+ blockers.retain(|iter| {
5505
+ // Note that we could actually be blocked, in
5506
+ // which case we need to only remove the one
5507
+ // blocker which was added duplicatively.
5508
+ let first_blocker = !found_blocker;
5509
+ if *iter == blocker { found_blocker = true; }
5510
+ *iter != blocker || !first_blocker
5511
+ });
5512
+ debug_assert!(found_blocker);
5513
+ }
5514
+ } else {
5515
+ debug_assert!(false);
5516
+ }
5463
5517
}
5464
5518
}
5465
5519
}
@@ -5507,7 +5561,7 @@ where
5507
5561
// `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
5508
5562
// generally always allowed to be duplicative (and it's specifically noted in
5509
5563
// `PaymentForwarded`).
5510
- self.handle_monitor_update_completion_actions(completion_action(None));
5564
+ self.handle_monitor_update_completion_actions(completion_action(None, false ));
5511
5565
Ok(())
5512
5566
}
5513
5567
@@ -5537,13 +5591,74 @@ where
5537
5591
HTLCSource::PreviousHopData(hop_data) => {
5538
5592
let prev_outpoint = hop_data.outpoint;
5539
5593
let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
5594
+ #[cfg(debug_assertions)]
5595
+ let claiming_chan_funding_outpoint = hop_data.outpoint;
5540
5596
let res = self.claim_funds_from_hop(hop_data, payment_preimage,
5541
- |htlc_claim_value_msat| {
5542
- if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
5543
- let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
5544
- Some(claimed_htlc_value - forwarded_htlc_value)
5545
- } else { None };
5597
+ |htlc_claim_value_msat, definitely_duplicate| {
5598
+ let chan_to_release =
5599
+ if let Some(node_id) = next_channel_counterparty_node_id {
5600
+ Some((node_id, next_channel_outpoint, completed_blocker))
5601
+ } else {
5602
+ // We can only get `None` here if we are processing a
5603
+ // `ChannelMonitor`-originated event, in which case we
5604
+ // don't care about ensuring we wake the downstream
5605
+ // channel's monitor updating - the channel is already
5606
+ // closed.
5607
+ None
5608
+ };
5546
5609
5610
+ if definitely_duplicate && startup_replay {
5611
+ // On startup we may get redundant claims which are related to
5612
+ // monitor updates still in flight. In that case, we shouldn't
5613
+ // immediately free, but instead let that monitor update complete
5614
+ // in the background.
5615
+ #[cfg(debug_assertions)] {
5616
+ let background_events = self.pending_background_events.lock().unwrap();
5617
+ // There should be a `BackgroundEvent` pending...
5618
+ assert!(background_events.iter().any(|ev| {
5619
+ match ev {
5620
+ // to apply a monitor update that blocked the claiming channel,
5621
+ BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5622
+ funding_txo, update, ..
5623
+ } => {
5624
+ if *funding_txo == claiming_chan_funding_outpoint {
5625
+ assert!(update.updates.iter().any(|upd|
5626
+ if let ChannelMonitorUpdateStep::PaymentPreimage {
5627
+ payment_preimage: update_preimage
5628
+ } = upd {
5629
+ payment_preimage == *update_preimage
5630
+ } else { false }
5631
+ ), "{:?}", update);
5632
+ true
5633
+ } else { false }
5634
+ },
5635
+ // or the channel we'd unblock is already closed,
5636
+ BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, ..))
5637
+ => *funding_txo == next_channel_outpoint,
5638
+ // or the monitor update has completed and will unblock
5639
+ // immediately once we get going.
5640
+ BackgroundEvent::MonitorUpdatesComplete {
5641
+ channel_id, ..
5642
+ } =>
5643
+ *channel_id == claiming_chan_funding_outpoint.to_channel_id(),
5644
+ }
5645
+ }), "{:?}", *background_events);
5646
+ }
5647
+ None
5648
+ } else if definitely_duplicate {
5649
+ if let Some(other_chan) = chan_to_release {
5650
+ Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
5651
+ downstream_counterparty_node_id: other_chan.0,
5652
+ downstream_funding_outpoint: other_chan.1,
5653
+ blocking_action: other_chan.2,
5654
+ })
5655
+ } else { None }
5656
+ } else {
5657
+ let fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
5658
+ if let Some(claimed_htlc_value) = htlc_claim_value_msat {
5659
+ Some(claimed_htlc_value - forwarded_htlc_value)
5660
+ } else { None }
5661
+ } else { None };
5547
5662
Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
5548
5663
event: events::Event::PaymentForwarded {
5549
5664
fee_earned_msat,
@@ -5552,19 +5667,9 @@ where
5552
5667
next_channel_id: Some(next_channel_outpoint.to_channel_id()),
5553
5668
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
5554
5669
},
5555
- downstream_counterparty_and_funding_outpoint:
5556
- if let Some(node_id) = next_channel_counterparty_node_id {
5557
- Some((node_id, next_channel_outpoint, completed_blocker))
5558
- } else {
5559
- // We can only get `None` here if we are processing a
5560
- // `ChannelMonitor`-originated event, in which case we
5561
- // don't care about ensuring we wake the downstream
5562
- // channel's monitor updating - the channel is already
5563
- // closed.
5564
- None
5565
- },
5670
+ downstream_counterparty_and_funding_outpoint: chan_to_release,
5566
5671
})
5567
- } else { None }
5672
+ }
5568
5673
});
5569
5674
if let Err((pk, err)) = res {
5570
5675
let result: Result<(), _> = Err(err);
@@ -5580,6 +5685,10 @@ where
5580
5685
}
5581
5686
5582
5687
fn handle_monitor_update_completion_actions<I: IntoIterator<Item=MonitorUpdateCompletionAction>>(&self, actions: I) {
5688
+ debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
5689
+ debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
5690
+ debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
5691
+
5583
5692
for action in actions.into_iter() {
5584
5693
match action {
5585
5694
MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
0 commit comments