Skip to content

Commit 7dfd4be

Browse files
channelmanager: Add retry data to pending_outbound_payments
1 parent 6b3b25f commit 7dfd4be

File tree

1 file changed

+119
-25
lines changed

1 file changed

+119
-25
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 119 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,64 @@ struct PendingInboundPayment {
400400
min_value_msat: Option<u64>,
401401
}
402402

403+
/// Stores the session_priv for each part of a payment which is still pending. For versions 0.0.102
404+
/// and later, stores information for retrying the payment.
405+
enum PendingOutboundPayment {
406+
Legacy {
407+
session_privs: HashSet<[u8; 32]>,
408+
},
409+
Retryable {
410+
session_privs: HashSet<[u8; 32]>,
411+
payment_hash: PaymentHash,
412+
payment_secret: Option<PaymentSecret>,
413+
pending_amt_msat: u64,
414+
/// The total payment amount across all paths, used to verify that a retry is not overpaying or
415+
/// underpaying.
416+
total_msat: u64,
417+
},
418+
}
419+
420+
impl PendingOutboundPayment {
421+
fn remove(&mut self, session_priv: &[u8; 32], part_amt_msat: u64) -> bool {
422+
let remove_res = match self {
423+
PendingOutboundPayment::Legacy { session_privs } |
424+
PendingOutboundPayment::Retryable { session_privs, .. } => {
425+
session_privs.remove(session_priv)
426+
}
427+
};
428+
if remove_res {
429+
if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self {
430+
*pending_amt_msat -= part_amt_msat;
431+
}
432+
}
433+
remove_res
434+
}
435+
436+
fn insert(&mut self, session_priv: [u8; 32], part_amt_msat: u64) -> bool {
437+
let insert_res = match self {
438+
PendingOutboundPayment::Legacy { session_privs } |
439+
PendingOutboundPayment::Retryable { session_privs, .. } => {
440+
session_privs.insert(session_priv)
441+
}
442+
};
443+
if insert_res {
444+
if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self {
445+
*pending_amt_msat += part_amt_msat;
446+
}
447+
}
448+
insert_res
449+
}
450+
451+
fn num_parts(&self) -> usize {
452+
match self {
453+
PendingOutboundPayment::Legacy { session_privs } |
454+
PendingOutboundPayment::Retryable { session_privs, .. } => {
455+
session_privs.len()
456+
}
457+
}
458+
}
459+
}
460+
403461
/// SimpleArcChannelManager is useful when you need a ChannelManager with a static lifetime, e.g.
404462
/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
405463
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
@@ -486,7 +544,7 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
486544
/// Locked *after* channel_state.
487545
pending_inbound_payments: Mutex<HashMap<PaymentHash, PendingInboundPayment>>,
488546

489-
/// The session_priv bytes of outbound payments which are pending resolution.
547+
/// The session_priv bytes and retry metadata of outbound payments which are pending resolution.
490548
/// The authoritative state of these HTLCs resides either within Channels or ChannelMonitors
491549
/// (if the channel has been force-closed), however we track them here to prevent duplicative
492550
/// PaymentSent/PaymentPathFailed events. Specifically, in the case of a duplicative
@@ -495,11 +553,10 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
495553
/// which may generate a claim event, we may receive similar duplicate claim/fail MonitorEvents
496554
/// after reloading from disk while replaying blocks against ChannelMonitors.
497555
///
498-
/// Each payment has each of its MPP part's session_priv bytes in the HashSet of the map (even
499-
/// payments over a single path).
556+
/// See `PendingOutboundPayment` documentation for more info.
500557
///
501558
/// Locked *after* channel_state.
502-
pending_outbound_payments: Mutex<HashMap<PaymentId, HashSet<[u8; 32]>>>,
559+
pending_outbound_payments: Mutex<HashMap<PaymentId, PendingOutboundPayment>>,
503560

504561
our_network_key: SecretKey,
505562
our_network_pubkey: PublicKey,
@@ -1894,8 +1951,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
18941951

18951952
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
18961953
let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
1897-
let sessions = pending_outbounds.entry(payment_id).or_insert(HashSet::new());
1898-
assert!(sessions.insert(session_priv_bytes));
1954+
let payment = pending_outbounds.entry(payment_id).or_insert(PendingOutboundPayment::Retryable {
1955+
session_privs: HashSet::new(),
1956+
pending_amt_msat: 0,
1957+
payment_hash: *payment_hash,
1958+
payment_secret: *payment_secret,
1959+
total_msat: total_value,
1960+
});
1961+
payment.insert(session_priv_bytes, path.last().unwrap().fee_msat);
18991962

19001963
let err: Result<(), _> = loop {
19011964
let mut channel_lock = self.channel_state.lock().unwrap();
@@ -2883,23 +2946,23 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
28832946
let mut session_priv_bytes = [0; 32];
28842947
session_priv_bytes.copy_from_slice(&session_priv[..]);
28852948
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
2886-
if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) {
2887-
if sessions.get_mut().remove(&session_priv_bytes) {
2949+
if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
2950+
if payment.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) {
28882951
self.pending_events.lock().unwrap().push(
28892952
events::Event::PaymentPathFailed {
28902953
payment_hash,
28912954
rejected_by_dest: false,
28922955
network_update: None,
2893-
all_paths_failed: sessions.get().len() == 0,
2956+
all_paths_failed: payment.get().num_parts() == 0,
28942957
path: path.clone(),
28952958
#[cfg(test)]
28962959
error_code: None,
28972960
#[cfg(test)]
28982961
error_data: None,
28992962
}
29002963
);
2901-
if sessions.get().len() == 0 {
2902-
sessions.remove();
2964+
if payment.get().num_parts() == 0 {
2965+
payment.remove();
29032966
}
29042967
}
29052968
} else {
@@ -2932,11 +2995,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
29322995
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
29332996
let mut all_paths_failed = false;
29342997
if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) {
2935-
if !sessions.get_mut().remove(&session_priv_bytes) {
2998+
if !sessions.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) {
29362999
log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
29373000
return;
29383001
}
2939-
if sessions.get().len() == 0 {
3002+
if sessions.get().num_parts() == 0 {
29403003
all_paths_failed = true;
29413004
sessions.remove();
29423005
}
@@ -3185,13 +3248,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
31853248

31863249
fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
31873250
match source {
3188-
HTLCSource::OutboundRoute { session_priv, payment_id, .. } => {
3251+
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
31893252
mem::drop(channel_state_lock);
31903253
let mut session_priv_bytes = [0; 32];
31913254
session_priv_bytes.copy_from_slice(&session_priv[..]);
31923255
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
31933256
let found_payment = if let Some(mut sessions) = outbounds.remove(&payment_id) {
3194-
sessions.remove(&session_priv_bytes)
3257+
sessions.remove(&session_priv_bytes, path.last().unwrap().fee_msat)
31953258
} else { false };
31963259
if found_payment {
31973260
self.pending_events.lock().unwrap().push(
@@ -5161,6 +5224,19 @@ impl_writeable_tlv_based!(PendingInboundPayment, {
51615224
(8, min_value_msat, required),
51625225
});
51635226

5227+
impl_writeable_tlv_based_enum!(PendingOutboundPayment,
5228+
(0, Legacy) => {
5229+
(0, session_privs, required),
5230+
},
5231+
(2, Retryable) => {
5232+
(0, session_privs, required),
5233+
(2, payment_hash, required),
5234+
(4, payment_secret, option),
5235+
(6, total_msat, required),
5236+
(8, pending_amt_msat, required),
5237+
},
5238+
;);
5239+
51645240
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable for ChannelManager<Signer, M, T, K, F, L>
51655241
where M::Target: chain::Watch<Signer>,
51665242
T::Target: BroadcasterInterface,
@@ -5251,18 +5327,25 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
52515327
let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();
52525328
// For backwards compat, write the session privs and their total length.
52535329
let mut num_pending_outbounds_compat: u64 = 0;
5254-
for (_, outbounds) in pending_outbound_payments.iter() {
5255-
num_pending_outbounds_compat += outbounds.len() as u64;
5330+
for (_, outbound) in pending_outbound_payments.iter() {
5331+
num_pending_outbounds_compat += outbound.num_parts() as u64;
52565332
}
52575333
num_pending_outbounds_compat.write(writer)?;
5258-
for (_, outbounds) in pending_outbound_payments.iter() {
5259-
for outbound in outbounds.iter() {
5260-
outbound.write(writer)?;
5334+
for (_, outbound) in pending_outbound_payments.iter() {
5335+
match outbound {
5336+
PendingOutboundPayment::Legacy { session_privs } |
5337+
PendingOutboundPayment::Retryable { session_privs, .. } => {
5338+
for session_priv in session_privs.iter() {
5339+
session_priv.write(writer)?;
5340+
}
5341+
}
52615342
}
52625343
}
52635344

5345+
let pending_outbound_payments_compat_2: HashMap<PaymentId, HashSet<[u8; 32]>> = HashMap::new();
52645346
write_tlv_fields!(writer, {
5265-
(1, pending_outbound_payments, required),
5347+
(1, pending_outbound_payments_compat_2, required),
5348+
(3, pending_outbound_payments, required),
52665349
});
52675350

52685351
Ok(())
@@ -5522,21 +5605,32 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
55225605
}
55235606

55245607
let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?;
5525-
let mut pending_outbound_payments_compat: HashMap<PaymentId, HashSet<[u8; 32]>> =
5608+
let mut pending_outbound_payments_compat: HashMap<PaymentId, PendingOutboundPayment> =
55265609
HashMap::with_capacity(cmp::min(pending_outbound_payments_count_compat as usize, MAX_ALLOC_SIZE/32));
55275610
for _ in 0..pending_outbound_payments_count_compat {
55285611
let session_priv = Readable::read(reader)?;
5529-
if pending_outbound_payments_compat.insert(PaymentId(session_priv), [session_priv].iter().cloned().collect()).is_some() {
5612+
let payment = PendingOutboundPayment::Legacy {
5613+
session_privs: [session_priv].iter().cloned().collect()
5614+
};
5615+
if pending_outbound_payments_compat.insert(PaymentId(session_priv), payment).is_some() {
55305616
return Err(DecodeError::InvalidValue)
55315617
};
55325618
}
55335619

5620+
let mut pending_outbound_payments_compat_2: Option<HashMap<PaymentId, HashSet<[u8; 32]>>> = None;
55345621
let mut pending_outbound_payments = None;
55355622
read_tlv_fields!(reader, {
5536-
(1, pending_outbound_payments, option),
5623+
(1, pending_outbound_payments_compat_2, option),
5624+
(3, pending_outbound_payments, option),
55375625
});
5538-
if pending_outbound_payments.is_none() {
5626+
if pending_outbound_payments.is_none() && pending_outbound_payments_compat_2.is_none() {
55395627
pending_outbound_payments = Some(pending_outbound_payments_compat);
5628+
} else if pending_outbound_payments.is_none() {
5629+
let mut outbounds = HashMap::new();
5630+
for (id, session_privs) in pending_outbound_payments_compat_2.unwrap().drain() {
5631+
outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs });
5632+
}
5633+
pending_outbound_payments = Some(outbounds);
55405634
}
55415635

55425636
let mut secp_ctx = Secp256k1::new();

0 commit comments

Comments
 (0)