Skip to content

Commit 7ebd003

Browse files
committed
Track payments after they resolve until all HTLCs are finalized
In the next commit, we will reload lost pending payments from ChannelMonitors during restart. However, in order to avoid re-adding pending payments which have already been fulfilled, we must ensure that we do not fully remove pending payments until all HTLCs for the payment have been fully removed from their ChannelMonitors. We do so here, introducing a new PendingOutboundPayment variant called `Completed` which only tracks the set of pending HTLCs.
1 parent 0e890f3 commit 7ebd003

File tree

1 file changed

+101
-11
lines changed

1 file changed

+101
-11
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 101 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -416,13 +416,44 @@ enum PendingOutboundPayment {
416416
/// Our best known block height at the time this payment was initiated.
417417
starting_block_height: u32,
418418
},
419+
/// When a payment completes, we continue tracking it until all pendings HTLCs have been
420+
/// resolved. This ensures we don't look up pending payments in ChannelMonitors on restart and
421+
/// add a pending payment that was already completed.
422+
Completed {
423+
session_privs: HashSet<[u8; 32]>,
424+
},
419425
}
420426

421427
impl PendingOutboundPayment {
428+
fn is_retryable(&self) -> bool {
429+
match self {
430+
PendingOutboundPayment::Retryable { .. } => true,
431+
_ => false,
432+
}
433+
}
434+
fn is_completed(&self) -> bool {
435+
match self {
436+
PendingOutboundPayment::Completed { .. } => true,
437+
_ => false,
438+
}
439+
}
440+
441+
fn mark_completed(&mut self) {
442+
let mut session_privs = HashSet::new();
443+
core::mem::swap(&mut session_privs, match self {
444+
PendingOutboundPayment::Legacy { session_privs } |
445+
PendingOutboundPayment::Retryable { session_privs, .. } |
446+
PendingOutboundPayment::Completed { session_privs }
447+
=> session_privs
448+
});
449+
*self = PendingOutboundPayment::Completed { session_privs };
450+
}
451+
422452
fn remove(&mut self, session_priv: &[u8; 32], part_amt_msat: u64) -> bool {
423453
let remove_res = match self {
424454
PendingOutboundPayment::Legacy { session_privs } |
425-
PendingOutboundPayment::Retryable { session_privs, .. } => {
455+
PendingOutboundPayment::Retryable { session_privs, .. } |
456+
PendingOutboundPayment::Completed { session_privs } => {
426457
session_privs.remove(session_priv)
427458
}
428459
};
@@ -440,6 +471,7 @@ impl PendingOutboundPayment {
440471
PendingOutboundPayment::Retryable { session_privs, .. } => {
441472
session_privs.insert(session_priv)
442473
}
474+
PendingOutboundPayment::Completed { .. } => false
443475
};
444476
if insert_res {
445477
if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self {
@@ -452,7 +484,8 @@ impl PendingOutboundPayment {
452484
fn remaining_parts(&self) -> usize {
453485
match self {
454486
PendingOutboundPayment::Legacy { session_privs } |
455-
PendingOutboundPayment::Retryable { session_privs, .. } => {
487+
PendingOutboundPayment::Retryable { session_privs, .. } |
488+
PendingOutboundPayment::Completed { session_privs } => {
456489
session_privs.len()
457490
}
458491
}
@@ -1964,6 +1997,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
19641997
starting_block_height: self.best_block.read().unwrap().height(),
19651998
total_msat: total_value,
19661999
});
2000+
if !payment.is_retryable() {
2001+
return Err(APIError::RouteError {
2002+
err: "Payment already completed"
2003+
});
2004+
}
19672005

19682006
let err: Result<(), _> = loop {
19692007
let mut channel_lock = self.channel_state.lock().unwrap();
@@ -2177,7 +2215,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
21772215
return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
21782216
err: "Unable to retry payments that were initially sent on LDK versions prior to 0.0.102".to_string()
21792217
}))
2180-
}
2218+
},
2219+
PendingOutboundPayment::Completed { .. } => {
2220+
return Err(PaymentSendFailure::ParameterError(APIError::RouteError {
2221+
err: "Payment already completed"
2222+
}));
2223+
},
21812224
}
21822225
} else {
21832226
return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
@@ -3005,7 +3048,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
30053048
session_priv_bytes.copy_from_slice(&session_priv[..]);
30063049
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
30073050
if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
3008-
if payment.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) {
3051+
if payment.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) &&
3052+
!payment.get().is_completed()
3053+
{
30093054
self.pending_events.lock().unwrap().push(
30103055
events::Event::PaymentPathFailed {
30113056
payment_hash,
@@ -3054,6 +3099,10 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
30543099
log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
30553100
return;
30563101
}
3102+
if sessions.get().is_completed() {
3103+
log_trace!(self.logger, "Received failure of HTLC with payment_hash {} after payment completion", log_bytes!(payment_hash.0));
3104+
return;
3105+
}
30573106
if sessions.get().remaining_parts() == 0 {
30583107
all_paths_failed = true;
30593108
}
@@ -3300,15 +3349,45 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
33003349
} else { unreachable!(); }
33013350
}
33023351

3352+
fn finalize_claims(&self, mut sources: Vec<HTLCSource>) {
3353+
for source in sources.drain(..) {
3354+
if let HTLCSource::OutboundRoute { session_priv, payment_id, .. } = source {
3355+
let mut session_priv_bytes = [0; 32];
3356+
session_priv_bytes.copy_from_slice(&session_priv[..]);
3357+
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
3358+
if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) {
3359+
assert!(sessions.get().is_completed());
3360+
sessions.get_mut().remove(&session_priv_bytes, 0); // Note that the amount is no longer tracked
3361+
if sessions.get().remaining_parts() == 0 {
3362+
sessions.remove();
3363+
}
3364+
}
3365+
}
3366+
}
3367+
}
33033368
fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
33043369
match source {
33053370
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
33063371
mem::drop(channel_state_lock);
33073372
let mut session_priv_bytes = [0; 32];
33083373
session_priv_bytes.copy_from_slice(&session_priv[..]);
33093374
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
3310-
let found_payment = if let Some(mut sessions) = outbounds.remove(&payment_id) {
3311-
sessions.remove(&session_priv_bytes, path.last().unwrap().fee_msat)
3375+
let found_payment = if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) {
3376+
let found_payment = !sessions.get().is_completed();
3377+
sessions.get_mut().mark_completed();
3378+
if from_onchain {
3379+
// We currently immediately remove HTLCs which were fulfilled on-chain.
3380+
// This could potentially lead to removing a pending payment too early,
3381+
// with a reorg of one block causing us to re-add the completed payment on
3382+
// restart.
3383+
// TODO: We should have a second monitor event that informs us of payments
3384+
// irrevocably completing.
3385+
sessions.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat);
3386+
if sessions.get().remaining_parts() == 0 {
3387+
sessions.remove();
3388+
}
3389+
}
3390+
found_payment
33123391
} else { false };
33133392
if found_payment {
33143393
self.pending_events.lock().unwrap().push(
@@ -3949,6 +4028,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
39494028
});
39504029
}
39514030
break Ok((raa_updates.to_forward_htlcs, raa_updates.failed_htlcs,
4031+
raa_updates.finalized_claim_htlcs,
39524032
chan.get().get_short_channel_id()
39534033
.expect("RAA should only work on a short-id-available channel"),
39544034
chan.get().get_funding_txo().unwrap()))
@@ -3958,11 +4038,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
39584038
};
39594039
self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id);
39604040
match res {
3961-
Ok((pending_forwards, mut pending_failures, short_channel_id, channel_outpoint)) => {
4041+
Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs,
4042+
short_channel_id, channel_outpoint)) =>
4043+
{
39624044
for failure in pending_failures.drain(..) {
39634045
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
39644046
}
39654047
self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, pending_forwards)]);
4048+
self.finalize_claims(finalized_claim_htlcs);
39664049
Ok(())
39674050
},
39684051
Err(e) => Err(e)
@@ -5308,10 +5391,13 @@ impl_writeable_tlv_based!(PendingInboundPayment, {
53085391
(8, min_value_msat, required),
53095392
});
53105393

5311-
impl_writeable_tlv_based_enum!(PendingOutboundPayment,
5394+
impl_writeable_tlv_based_enum_upgradable!(PendingOutboundPayment,
53125395
(0, Legacy) => {
53135396
(0, session_privs, required),
53145397
},
5398+
(1, Completed) => {
5399+
(0, session_privs, required),
5400+
},
53155401
(2, Retryable) => {
53165402
(0, session_privs, required),
53175403
(2, payment_hash, required),
@@ -5320,7 +5406,7 @@ impl_writeable_tlv_based_enum!(PendingOutboundPayment,
53205406
(8, pending_amt_msat, required),
53215407
(10, starting_block_height, required),
53225408
},
5323-
;);
5409+
);
53245410

53255411
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable for ChannelManager<Signer, M, T, K, F, L>
53265412
where M::Target: chain::Watch<Signer>,
@@ -5413,7 +5499,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
54135499
// For backwards compat, write the session privs and their total length.
54145500
let mut num_pending_outbounds_compat: u64 = 0;
54155501
for (_, outbound) in pending_outbound_payments.iter() {
5416-
num_pending_outbounds_compat += outbound.remaining_parts() as u64;
5502+
if !outbound.is_completed() {
5503+
num_pending_outbounds_compat += outbound.remaining_parts() as u64;
5504+
}
54175505
}
54185506
num_pending_outbounds_compat.write(writer)?;
54195507
for (_, outbound) in pending_outbound_payments.iter() {
@@ -5424,6 +5512,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
54245512
session_priv.write(writer)?;
54255513
}
54265514
}
5515+
PendingOutboundPayment::Completed { .. }=> {},
54275516
}
54285517
}
54295518

@@ -5434,7 +5523,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
54345523
PendingOutboundPayment::Legacy { session_privs } |
54355524
PendingOutboundPayment::Retryable { session_privs, .. } => {
54365525
pending_outbound_payments_no_retry.insert(*id, session_privs.clone());
5437-
}
5526+
},
5527+
_ => {},
54385528
}
54395529
}
54405530
write_tlv_fields!(writer, {

0 commit comments

Comments
 (0)