Skip to content

Commit ce5dcba

Browse files
Add MPP ID to pending_outbound_htlcs
We'll use this to correlate MPP shards in upcoming commits
1 parent f01f9ac commit ce5dcba

File tree

1 file changed

+87
-48
lines changed

1 file changed

+87
-48
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 87 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -491,8 +491,11 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
491491
/// which may generate a claim event, we may receive similar duplicate claim/fail MonitorEvents
492492
/// after reloading from disk while replaying blocks against ChannelMonitors.
493493
///
494+
/// Each payment has each of its MPP fragment's session_priv bytes in the HashSet of the map
495+
/// (even payments over a single path).
496+
///
494497
/// Locked *after* channel_state.
495-
pending_outbound_payments: Mutex<HashSet<[u8; 32]>>,
498+
pending_outbound_payments: Mutex<HashMap<MppId, HashSet<[u8; 32]>>>,
496499

497500
our_network_key: SecretKey,
498501
our_network_pubkey: PublicKey,
@@ -1156,7 +1159,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
11561159
pending_msg_events: Vec::new(),
11571160
}),
11581161
pending_inbound_payments: Mutex::new(HashMap::new()),
1159-
pending_outbound_payments: Mutex::new(HashSet::new()),
1162+
pending_outbound_payments: Mutex::new(HashMap::new()),
11601163

11611164
our_network_key: keys_manager.get_node_secret(),
11621165
our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()),
@@ -1853,7 +1856,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
18531856
let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
18541857

18551858
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
1856-
assert!(self.pending_outbound_payments.lock().unwrap().insert(session_priv_bytes));
1859+
let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
1860+
let sessions = pending_outbounds.entry(mpp_id).or_insert(HashSet::new());
1861+
assert!(sessions.insert(session_priv_bytes));
18571862

18581863
let err: Result<(), _> = loop {
18591864
let mut channel_lock = self.channel_state.lock().unwrap();
@@ -2832,23 +2837,27 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
28322837
self.fail_htlc_backwards_internal(channel_state,
28332838
htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code, data: onion_failure_data});
28342839
},
2835-
HTLCSource::OutboundRoute { session_priv, .. } => {
2836-
if {
2837-
let mut session_priv_bytes = [0; 32];
2838-
session_priv_bytes.copy_from_slice(&session_priv[..]);
2839-
self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
2840-
} {
2841-
self.pending_events.lock().unwrap().push(
2842-
events::Event::PaymentFailed {
2843-
payment_hash,
2844-
rejected_by_dest: false,
2845-
network_update: None,
2846-
#[cfg(test)]
2847-
error_code: None,
2848-
#[cfg(test)]
2849-
error_data: None,
2840+
HTLCSource::OutboundRoute { session_priv, mpp_id, .. } => {
2841+
let mut session_priv_bytes = [0; 32];
2842+
session_priv_bytes.copy_from_slice(&session_priv[..]);
2843+
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
2844+
if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(mpp_id) {
2845+
if sessions.get_mut().remove(&session_priv_bytes) {
2846+
self.pending_events.lock().unwrap().push(
2847+
events::Event::PaymentFailed {
2848+
payment_hash,
2849+
rejected_by_dest: false,
2850+
network_update: None,
2851+
#[cfg(test)]
2852+
error_code: None,
2853+
#[cfg(test)]
2854+
error_data: None,
2855+
}
2856+
);
2857+
if sessions.get().len() == 0 {
2858+
sessions.remove();
28502859
}
2851-
)
2860+
}
28522861
} else {
28532862
log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
28542863
}
@@ -2873,14 +2882,21 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
28732882
// from block_connected which may run during initialization prior to the chain_monitor
28742883
// being fully configured. See the docs for `ChannelManagerReadArgs` for more.
28752884
match source {
2876-
HTLCSource::OutboundRoute { ref path, session_priv, .. } => {
2877-
if {
2878-
let mut session_priv_bytes = [0; 32];
2879-
session_priv_bytes.copy_from_slice(&session_priv[..]);
2880-
!self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
2881-
} {
2885+
HTLCSource::OutboundRoute { ref path, session_priv, mpp_id, .. } => {
2886+
let mut session_priv_bytes = [0; 32];
2887+
session_priv_bytes.copy_from_slice(&session_priv[..]);
2888+
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
2889+
if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(mpp_id) {
2890+
if !sessions.get_mut().remove(&session_priv_bytes) {
2891+
log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
2892+
return;
2893+
}
2894+
if sessions.get().len() == 0 {
2895+
sessions.remove();
2896+
}
2897+
} else {
28822898
log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
2883-
return;
2899+
return
28842900
}
28852901
log_trace!(self.logger, "Failing outbound payment HTLC with payment_hash {}", log_bytes!(payment_hash.0));
28862902
mem::drop(channel_state_lock);
@@ -3119,17 +3135,22 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
31193135

31203136
fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
31213137
match source {
3122-
HTLCSource::OutboundRoute { session_priv, .. } => {
3138+
HTLCSource::OutboundRoute { session_priv, mpp_id, .. } => {
31233139
mem::drop(channel_state_lock);
3124-
if {
3125-
let mut session_priv_bytes = [0; 32];
3126-
session_priv_bytes.copy_from_slice(&session_priv[..]);
3127-
self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
3128-
} {
3129-
let mut pending_events = self.pending_events.lock().unwrap();
3130-
pending_events.push(events::Event::PaymentSent {
3131-
payment_preimage
3132-
});
3140+
let mut session_priv_bytes = [0; 32];
3141+
session_priv_bytes.copy_from_slice(&session_priv[..]);
3142+
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
3143+
if let Some(sessions) = outbounds.get_mut(&mpp_id) {
3144+
if sessions.remove(&session_priv_bytes) {
3145+
self.pending_events.lock().unwrap().push(
3146+
events::Event::PaymentSent { payment_preimage }
3147+
);
3148+
if sessions.len() == 0 {
3149+
outbounds.remove(&mpp_id);
3150+
}
3151+
} else {
3152+
log_trace!(self.logger, "Received duplicative fulfill for HTLC with payment_preimage {}", log_bytes!(payment_preimage.0));
3153+
}
31333154
} else {
31343155
log_trace!(self.logger, "Received duplicative fulfill for HTLC with payment_preimage {}", log_bytes!(payment_preimage.0));
31353156
}
@@ -5105,12 +5126,22 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
51055126
}
51065127

51075128
let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();
5108-
(pending_outbound_payments.len() as u64).write(writer)?;
5109-
for session_priv in pending_outbound_payments.iter() {
5110-
session_priv.write(writer)?;
5129+
// For backwards compat, write the session privs and their total length.
5130+
let mut num_pending_outbounds_compat: u64 = 0;
5131+
for (_, outbounds) in pending_outbound_payments.iter() {
5132+
num_pending_outbounds_compat += outbounds.len() as u64;
5133+
}
5134+
num_pending_outbounds_compat.write(writer)?;
5135+
for (_, outbounds) in pending_outbound_payments.iter() {
5136+
for outbound in outbounds.iter() {
5137+
outbound.write(writer)?;
5138+
}
51115139
}
51125140

5113-
write_tlv_fields!(writer, {});
5141+
let pending_outbound_payments = Some(pending_outbound_payments);
5142+
write_tlv_fields!(writer, {
5143+
(0, pending_outbound_payments, option),
5144+
});
51145145

51155146
Ok(())
51165147
}
@@ -5363,15 +5394,23 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
53635394
}
53645395
}
53655396

5366-
let pending_outbound_payments_count: u64 = Readable::read(reader)?;
5367-
let mut pending_outbound_payments: HashSet<[u8; 32]> = HashSet::with_capacity(cmp::min(pending_outbound_payments_count as usize, MAX_ALLOC_SIZE/32));
5368-
for _ in 0..pending_outbound_payments_count {
5369-
if !pending_outbound_payments.insert(Readable::read(reader)?) {
5370-
return Err(DecodeError::InvalidValue);
5371-
}
5397+
let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?;
5398+
let mut pending_outbound_payments_compat: HashMap<MppId, HashSet<[u8; 32]>> =
5399+
HashMap::with_capacity(cmp::min(pending_outbound_payments_count_compat as usize, MAX_ALLOC_SIZE/32));
5400+
for _ in 0..pending_outbound_payments_count_compat {
5401+
let session_priv = Readable::read(reader)?;
5402+
if pending_outbound_payments_compat.insert(MppId(session_priv), [session_priv].iter().cloned().collect()).is_some() {
5403+
return Err(DecodeError::InvalidValue)
5404+
};
53725405
}
53735406

5374-
read_tlv_fields!(reader, {});
5407+
let mut pending_outbound_payments = None;
5408+
read_tlv_fields!(reader, {
5409+
(0, pending_outbound_payments, option),
5410+
});
5411+
if pending_outbound_payments.is_none() {
5412+
pending_outbound_payments = Some(pending_outbound_payments_compat);
5413+
}
53755414

53765415
let mut secp_ctx = Secp256k1::new();
53775416
secp_ctx.seeded_randomize(&args.keys_manager.get_secure_random_bytes());
@@ -5392,7 +5431,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
53925431
pending_msg_events: Vec::new(),
53935432
}),
53945433
pending_inbound_payments: Mutex::new(pending_inbound_payments),
5395-
pending_outbound_payments: Mutex::new(pending_outbound_payments),
5434+
pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()),
53965435

53975436
our_network_key: args.keys_manager.get_node_secret(),
53985437
our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &args.keys_manager.get_node_secret()),

0 commit comments

Comments
 (0)