@@ -563,6 +563,7 @@ struct ClaimablePayments {
563
563
/// usually because we're running pre-full-init. They are handled immediately once we detect we are
564
564
/// running normally, and specifically must be processed before any other non-background
565
565
/// [`ChannelMonitorUpdate`]s are applied.
566
+ #[derive(Debug)]
566
567
enum BackgroundEvent {
567
568
/// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
568
569
/// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
@@ -5375,8 +5376,11 @@ where
5375
5376
for htlc in sources.drain(..) {
5376
5377
if let Err((pk, err)) = self.claim_funds_from_hop(
5377
5378
htlc.prev_hop, payment_preimage,
5378
- |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }))
5379
- {
5379
+ |_, definitely_duplicate| {
5380
+ debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
5381
+ Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
5382
+ }
5383
+ ) {
5380
5384
if let msgs::ErrorAction::IgnoreError = err.err.action {
5381
5385
// We got a temporary failure updating monitor, but will claim the
5382
5386
// HTLC when the monitor updating is restored (or on chain).
@@ -5404,7 +5408,7 @@ where
5404
5408
}
5405
5409
}
5406
5410
5407
- fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>) -> Option<MonitorUpdateCompletionAction>>(&self,
5411
+ fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool ) -> Option<MonitorUpdateCompletionAction>>(&self,
5408
5412
prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
5409
5413
-> Result<(), (PublicKey, MsgHandleErrInternal)> {
5410
5414
//TODO: Delay the claimed_funds relaying just like we do outbound relay!
@@ -5414,6 +5418,11 @@ where
5414
5418
// `BackgroundEvent`s.
5415
5419
let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
5416
5420
5421
+ // As we may call handle_monitor_update_completion_actions in rather rare cases, check that
5422
+ // the required mutexes are not held before we start.
5423
+ debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
5424
+ debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
5425
+
5417
5426
{
5418
5427
let per_peer_state = self.per_peer_state.read().unwrap();
5419
5428
let chan_id = prev_hop.outpoint.to_channel_id();
@@ -5435,25 +5444,58 @@ where
5435
5444
let counterparty_node_id = chan.context.get_counterparty_node_id();
5436
5445
let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
5437
5446
5438
- if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res {
5439
- if let Some(action) = completion_action(Some(htlc_value_msat)) {
5440
- log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
5441
- chan_id, action);
5442
- peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
5447
+ match fulfill_res {
5448
+ UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
5449
+ if let Some(action) = completion_action(Some(htlc_value_msat), false) {
5450
+ log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
5451
+ chan_id, action);
5452
+ peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
5453
+ }
5454
+ if !during_init {
5455
+ handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
5456
+ peer_state, per_peer_state, chan);
5457
+ } else {
5458
+ // If we're running during init we cannot update a monitor directly -
5459
+ // they probably haven't actually been loaded yet. Instead, push the
5460
+ // monitor update as a background event.
5461
+ self.pending_background_events.lock().unwrap().push(
5462
+ BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5463
+ counterparty_node_id,
5464
+ funding_txo: prev_hop.outpoint,
5465
+ update: monitor_update.clone(),
5466
+ });
5467
+ }
5443
5468
}
5444
- if !during_init {
5445
- handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
5446
- peer_state, per_peer_state, chan);
5447
- } else {
5448
- // If we're running during init we cannot update a monitor directly -
5449
- // they probably haven't actually been loaded yet. Instead, push the
5450
- // monitor update as a background event.
5451
- self.pending_background_events.lock().unwrap().push(
5452
- BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5453
- counterparty_node_id,
5454
- funding_txo: prev_hop.outpoint,
5455
- update: monitor_update.clone(),
5456
- });
5469
+ UpdateFulfillCommitFetch::DuplicateClaim {} => {
5470
+ if let Some(action) = completion_action(None, true) {
5471
+ log_trace!(self.logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
5472
+ chan_id, action);
5473
+ mem::drop(peer_state_lock);
5474
+ if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
5475
+ downstream_counterparty_and_funding_outpoint
5476
+ } = action {
5477
+ let (node_id, funding_outpoint, blocker) =
5478
+ downstream_counterparty_and_funding_outpoint;
5479
+ if let Some(peer_state_mtx) = per_peer_state.get(&node_id) {
5480
+ let mut peer_state = peer_state_mtx.lock().unwrap();
5481
+ if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates
5482
+ .get_mut(&funding_outpoint.to_channel_id())
5483
+ {
5484
+ let mut found_blocker = false;
5485
+ blockers.retain(|iter| {
5486
+ if *iter == blocker { found_blocker = true; }
5487
+ *iter != blocker
5488
+ });
5489
+ debug_assert!(found_blocker);
5490
+ }
5491
+ } else {
5492
+ debug_assert!(false);
5493
+ }
5494
+ } else {
5495
+ debug_assert!(false,
5496
+ "Duplicate claims should always free another channel immediately");
5497
+ }
5498
+ }
5457
5499
}
5458
5500
}
5459
5501
}
@@ -5501,7 +5543,7 @@ where
5501
5543
// `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
5502
5544
// generally always allowed to be duplicative (and it's specifically noted in
5503
5545
// `PaymentForwarded`).
5504
- self.handle_monitor_update_completion_actions(completion_action(None));
5546
+ self.handle_monitor_update_completion_actions(completion_action(None, false ));
5505
5547
Ok(())
5506
5548
}
5507
5549
@@ -5531,33 +5573,72 @@ where
5531
5573
HTLCSource::PreviousHopData(hop_data) => {
5532
5574
let prev_outpoint = hop_data.outpoint;
5533
5575
let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
5576
+ #[cfg(debug_assertions)]
5577
+ let claiming_chan_funding_outpoint = hop_data.outpoint;
5534
5578
let res = self.claim_funds_from_hop(hop_data, payment_preimage,
5535
- |htlc_claim_value_msat| {
5579
+ |htlc_claim_value_msat, definitely_duplicate | {
5536
5580
if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
5537
5581
let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
5538
5582
Some(claimed_htlc_value - forwarded_htlc_value)
5539
5583
} else { None };
5540
5584
5541
- Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
5542
- event: events::Event::PaymentForwarded {
5543
- fee_earned_msat,
5544
- claim_from_onchain_tx: from_onchain,
5545
- prev_channel_id: Some(prev_outpoint.to_channel_id()),
5546
- next_channel_id: Some(next_channel_outpoint.to_channel_id()),
5547
- outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
5548
- },
5549
- downstream_counterparty_and_funding_outpoint:
5550
- if let Some(node_id) = next_channel_counterparty_node_id {
5551
- Some((node_id, next_channel_outpoint, completed_blocker))
5552
- } else {
5553
- // We can only get `None` here if we are processing a
5554
- // `ChannelMonitor`-originated event, in which case we
5555
- // don't care about ensuring we wake the downstream
5556
- // channel's monitor updating - the channel is already
5557
- // closed.
5558
- None
5585
+ let chan_to_release =
5586
+ if let Some(node_id) = next_channel_counterparty_node_id {
5587
+ Some((node_id, next_channel_outpoint, completed_blocker))
5588
+ } else {
5589
+ // We can only get `None` here if we are processing a
5590
+ // `ChannelMonitor`-originated event, in which case we
5591
+ // don't care about ensuring we wake the downstream
5592
+ // channel's monitor updating - the channel is already
5593
+ // closed.
5594
+ None
5595
+ };
5596
+
5597
+ if definitely_duplicate && startup_replay {
5598
+ // On startup we may get get redundant claims which are related to
5599
+ // monitor updates still in flight. In that case, we shouldn't
5600
+ // immediately free, but instead let that monitor update complete
5601
+ // in the background.
5602
+ #[cfg(debug_assertions)] {
5603
+ let background_events = self.pending_background_events.lock().unwrap();
5604
+ // There should be a `BackgroundEvent` pending...
5605
+ assert!(background_events.iter().any(|ev| {
5606
+ match ev {
5607
+ // to apply a monitor update that blocked channel,
5608
+ BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5609
+ funding_txo, ..
5610
+ } => *funding_txo == claiming_chan_funding_outpoint,
5611
+ // or the channel we'd unblock is already closed,
5612
+ BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, ..))
5613
+ => *funding_txo == next_channel_outpoint,
5614
+ // or the monitor update has completed and will unblock
5615
+ // immediately once we get going.
5616
+ BackgroundEvent::MonitorUpdatesComplete {
5617
+ channel_id, ..
5618
+ } =>
5619
+ *channel_id == claiming_chan_funding_outpoint.to_channel_id(),
5620
+ }
5621
+ }), "{:?}", *background_events);
5622
+ }
5623
+ None
5624
+ } else if definitely_duplicate {
5625
+ if let Some(other_chan) = chan_to_release {
5626
+ Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
5627
+ downstream_counterparty_and_funding_outpoint: other_chan,
5628
+ })
5629
+ } else { None }
5630
+ } else {
5631
+ Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
5632
+ event: events::Event::PaymentForwarded {
5633
+ fee_earned_msat,
5634
+ claim_from_onchain_tx: from_onchain,
5635
+ prev_channel_id: Some(prev_outpoint.to_channel_id()),
5636
+ next_channel_id: Some(next_channel_outpoint.to_channel_id()),
5637
+ outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
5559
5638
},
5560
- })
5639
+ downstream_counterparty_and_funding_outpoint: chan_to_release,
5640
+ })
5641
+ }
5561
5642
} else { None }
5562
5643
});
5563
5644
if let Err((pk, err)) = res {
@@ -5574,6 +5655,10 @@ where
5574
5655
}
5575
5656
5576
5657
fn handle_monitor_update_completion_actions<I: IntoIterator<Item=MonitorUpdateCompletionAction>>(&self, actions: I) {
5658
+ debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
5659
+ debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
5660
+ debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
5661
+
5577
5662
for action in actions.into_iter() {
5578
5663
match action {
5579
5664
MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
0 commit comments