@@ -491,8 +491,11 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
491
491
/// which may generate a claim event, we may receive similar duplicate claim/fail MonitorEvents
492
492
/// after reloading from disk while replaying blocks against ChannelMonitors.
493
493
///
494
+ /// Each payment has each of its MPP fragment's session_priv bytes in the HashSet of the map
495
+ /// (even payments over a single path).
496
+ ///
494
497
/// Locked *after* channel_state.
495
- pending_outbound_payments : Mutex < HashSet < [ u8 ; 32 ] > > ,
498
+ pending_outbound_payments : Mutex < HashMap < MppId , HashSet < [ u8 ; 32 ] > > > ,
496
499
497
500
our_network_key : SecretKey ,
498
501
our_network_pubkey : PublicKey ,
@@ -1156,7 +1159,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
1156
1159
pending_msg_events : Vec :: new ( ) ,
1157
1160
} ) ,
1158
1161
pending_inbound_payments : Mutex :: new ( HashMap :: new ( ) ) ,
1159
- pending_outbound_payments : Mutex :: new ( HashSet :: new ( ) ) ,
1162
+ pending_outbound_payments : Mutex :: new ( HashMap :: new ( ) ) ,
1160
1163
1161
1164
our_network_key : keys_manager. get_node_secret ( ) ,
1162
1165
our_network_pubkey : PublicKey :: from_secret_key ( & secp_ctx, & keys_manager. get_node_secret ( ) ) ,
@@ -1853,7 +1856,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
1853
1856
let onion_packet = onion_utils:: construct_onion_packet ( onion_payloads, onion_keys, prng_seed, payment_hash) ;
1854
1857
1855
1858
let _persistence_guard = PersistenceNotifierGuard :: notify_on_drop ( & self . total_consistency_lock , & self . persistence_notifier ) ;
1856
- assert ! ( self . pending_outbound_payments. lock( ) . unwrap( ) . insert( session_priv_bytes) ) ;
1859
+ let mut pending_outbounds = self . pending_outbound_payments . lock ( ) . unwrap ( ) ;
1860
+ let sessions = pending_outbounds. entry ( mpp_id) . or_insert ( HashSet :: new ( ) ) ;
1861
+ assert ! ( sessions. insert( session_priv_bytes) ) ;
1857
1862
1858
1863
let err: Result < ( ) , _ > = loop {
1859
1864
let mut channel_lock = self . channel_state . lock ( ) . unwrap ( ) ;
@@ -2832,23 +2837,27 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
2832
2837
self . fail_htlc_backwards_internal ( channel_state,
2833
2838
htlc_src, & payment_hash, HTLCFailReason :: Reason { failure_code, data : onion_failure_data} ) ;
2834
2839
} ,
2835
- HTLCSource :: OutboundRoute { session_priv, .. } => {
2836
- if {
2837
- let mut session_priv_bytes = [ 0 ; 32 ] ;
2838
- session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
2839
- self . pending_outbound_payments . lock ( ) . unwrap ( ) . remove ( & session_priv_bytes)
2840
- } {
2841
- self . pending_events . lock ( ) . unwrap ( ) . push (
2842
- events:: Event :: PaymentFailed {
2843
- payment_hash,
2844
- rejected_by_dest : false ,
2845
- network_update : None ,
2846
- #[ cfg( test) ]
2847
- error_code : None ,
2848
- #[ cfg( test) ]
2849
- error_data : None ,
2840
+ HTLCSource :: OutboundRoute { session_priv, mpp_id, .. } => {
2841
+ let mut session_priv_bytes = [ 0 ; 32 ] ;
2842
+ session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
2843
+ let mut outbounds = self . pending_outbound_payments . lock ( ) . unwrap ( ) ;
2844
+ if let Some ( sessions) = outbounds. get_mut ( & mpp_id) {
2845
+ if sessions. remove ( & session_priv_bytes) {
2846
+ self . pending_events . lock ( ) . unwrap ( ) . push (
2847
+ events:: Event :: PaymentFailed {
2848
+ payment_hash,
2849
+ rejected_by_dest : false ,
2850
+ network_update : None ,
2851
+ #[ cfg( test) ]
2852
+ error_code : None ,
2853
+ #[ cfg( test) ]
2854
+ error_data : None ,
2855
+ }
2856
+ ) ;
2857
+ if sessions. len ( ) == 0 {
2858
+ outbounds. remove ( & mpp_id) ;
2850
2859
}
2851
- )
2860
+ }
2852
2861
} else {
2853
2862
log_trace ! ( self . logger, "Received duplicative fail for HTLC with payment_hash {}" , log_bytes!( payment_hash. 0 ) ) ;
2854
2863
}
@@ -2873,14 +2882,21 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
2873
2882
// from block_connected which may run during initialization prior to the chain_monitor
2874
2883
// being fully configured. See the docs for `ChannelManagerReadArgs` for more.
2875
2884
match source {
2876
- HTLCSource :: OutboundRoute { ref path, session_priv, .. } => {
2877
- if {
2878
- let mut session_priv_bytes = [ 0 ; 32 ] ;
2879
- session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
2880
- !self . pending_outbound_payments . lock ( ) . unwrap ( ) . remove ( & session_priv_bytes)
2881
- } {
2885
+ HTLCSource :: OutboundRoute { ref path, session_priv, mpp_id, .. } => {
2886
+ let mut session_priv_bytes = [ 0 ; 32 ] ;
2887
+ session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
2888
+ let mut outbounds = self . pending_outbound_payments . lock ( ) . unwrap ( ) ;
2889
+ if let Some ( sessions) = outbounds. get_mut ( & mpp_id) {
2890
+ if !sessions. remove ( & session_priv_bytes) {
2891
+ log_trace ! ( self . logger, "Received duplicative fail for HTLC with payment_hash {}" , log_bytes!( payment_hash. 0 ) ) ;
2892
+ return ;
2893
+ }
2894
+ if sessions. len ( ) == 0 {
2895
+ outbounds. remove ( & mpp_id) ;
2896
+ }
2897
+ } else {
2882
2898
log_trace ! ( self . logger, "Received duplicative fail for HTLC with payment_hash {}" , log_bytes!( payment_hash. 0 ) ) ;
2883
- return ;
2899
+ return
2884
2900
}
2885
2901
log_trace ! ( self . logger, "Failing outbound payment HTLC with payment_hash {}" , log_bytes!( payment_hash. 0 ) ) ;
2886
2902
mem:: drop ( channel_state_lock) ;
@@ -3119,17 +3135,22 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3119
3135
3120
3136
fn claim_funds_internal ( & self , mut channel_state_lock : MutexGuard < ChannelHolder < Signer > > , source : HTLCSource , payment_preimage : PaymentPreimage , forwarded_htlc_value_msat : Option < u64 > , from_onchain : bool ) {
3121
3137
match source {
3122
- HTLCSource :: OutboundRoute { session_priv, .. } => {
3138
+ HTLCSource :: OutboundRoute { session_priv, mpp_id , .. } => {
3123
3139
mem:: drop ( channel_state_lock) ;
3124
- if {
3125
- let mut session_priv_bytes = [ 0 ; 32 ] ;
3126
- session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
3127
- self . pending_outbound_payments . lock ( ) . unwrap ( ) . remove ( & session_priv_bytes)
3128
- } {
3129
- let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
3130
- pending_events. push ( events:: Event :: PaymentSent {
3131
- payment_preimage
3132
- } ) ;
3140
+ let mut session_priv_bytes = [ 0 ; 32 ] ;
3141
+ session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
3142
+ let mut outbounds = self . pending_outbound_payments . lock ( ) . unwrap ( ) ;
3143
+ if let Some ( sessions) = outbounds. get_mut ( & mpp_id) {
3144
+ if sessions. remove ( & session_priv_bytes) {
3145
+ self . pending_events . lock ( ) . unwrap ( ) . push (
3146
+ events:: Event :: PaymentSent { payment_preimage }
3147
+ ) ;
3148
+ if sessions. len ( ) == 0 {
3149
+ outbounds. remove ( & mpp_id) ;
3150
+ }
3151
+ } else {
3152
+ log_trace ! ( self . logger, "Received duplicative fulfill for HTLC with payment_preimage {}" , log_bytes!( payment_preimage. 0 ) ) ;
3153
+ }
3133
3154
} else {
3134
3155
log_trace ! ( self . logger, "Received duplicative fulfill for HTLC with payment_preimage {}" , log_bytes!( payment_preimage. 0 ) ) ;
3135
3156
}
@@ -5104,13 +5125,25 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
5104
5125
pending_payment. write ( writer) ?;
5105
5126
}
5106
5127
5107
- let pending_outbound_payments = self . pending_outbound_payments . lock ( ) . unwrap ( ) ;
5108
- ( pending_outbound_payments. len ( ) as u64 ) . write ( writer) ?;
5109
- for session_priv in pending_outbound_payments. iter ( ) {
5110
- session_priv. write ( writer) ?;
5128
+ let pending_outbounds_compat = {
5129
+ let mut pending_session_privs: HashSet < [ u8 ; 32 ] > = HashSet :: new ( ) ;
5130
+ let pending_outbound_payments = self . pending_outbound_payments . lock ( ) . unwrap ( ) ;
5131
+ for ( _, outbounds) in pending_outbound_payments. iter ( ) {
5132
+ for outbound in outbounds. iter ( ) {
5133
+ pending_session_privs. insert ( * outbound) ;
5134
+ }
5135
+ }
5136
+ pending_session_privs
5137
+ } ;
5138
+ ( pending_outbounds_compat. len ( ) as u64 ) . write ( writer) ?;
5139
+ for outbound in pending_outbounds_compat {
5140
+ outbound. write ( writer) ?;
5111
5141
}
5112
5142
5113
- write_tlv_fields ! ( writer, { } ) ;
5143
+ let pending_outbound_payments = Some ( self . pending_outbound_payments . lock ( ) . unwrap ( ) . clone ( ) ) ;
5144
+ write_tlv_fields ! ( writer, {
5145
+ ( 0 , pending_outbound_payments, option) ,
5146
+ } ) ;
5114
5147
5115
5148
Ok ( ( ) )
5116
5149
}
@@ -5363,15 +5396,23 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
5363
5396
}
5364
5397
}
5365
5398
5366
- let pending_outbound_payments_count: u64 = Readable :: read ( reader) ?;
5367
- let mut pending_outbound_payments: HashSet < [ u8 ; 32 ] > = HashSet :: with_capacity ( cmp:: min ( pending_outbound_payments_count as usize , MAX_ALLOC_SIZE /32 ) ) ;
5368
- for _ in 0 ..pending_outbound_payments_count {
5369
- if !pending_outbound_payments. insert ( Readable :: read ( reader) ?) {
5370
- return Err ( DecodeError :: InvalidValue ) ;
5371
- }
5399
+ let pending_outbound_payments_count_compat: u64 = Readable :: read ( reader) ?;
5400
+ let mut pending_outbound_payments_compat: HashMap < MppId , HashSet < [ u8 ; 32 ] > > =
5401
+ HashMap :: with_capacity ( cmp:: min ( pending_outbound_payments_count_compat as usize , MAX_ALLOC_SIZE /32 ) ) ;
5402
+ for _ in 0 ..pending_outbound_payments_count_compat {
5403
+ let session_priv = Readable :: read ( reader) ?;
5404
+ if pending_outbound_payments_compat. insert ( MppId ( session_priv) , [ session_priv] . iter ( ) . cloned ( ) . collect ( ) ) . is_some ( ) {
5405
+ return Err ( DecodeError :: InvalidValue )
5406
+ } ;
5372
5407
}
5373
5408
5374
- read_tlv_fields ! ( reader, { } ) ;
5409
+ let mut pending_outbound_payments = None ;
5410
+ read_tlv_fields ! ( reader, {
5411
+ ( 0 , pending_outbound_payments, option) ,
5412
+ } ) ;
5413
+ if pending_outbound_payments. is_none ( ) {
5414
+ pending_outbound_payments = Some ( pending_outbound_payments_compat) ;
5415
+ }
5375
5416
5376
5417
let mut secp_ctx = Secp256k1 :: new ( ) ;
5377
5418
secp_ctx. seeded_randomize ( & args. keys_manager . get_secure_random_bytes ( ) ) ;
@@ -5392,7 +5433,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
5392
5433
pending_msg_events : Vec :: new ( ) ,
5393
5434
} ) ,
5394
5435
pending_inbound_payments : Mutex :: new ( pending_inbound_payments) ,
5395
- pending_outbound_payments : Mutex :: new ( pending_outbound_payments) ,
5436
+ pending_outbound_payments : Mutex :: new ( pending_outbound_payments. unwrap ( ) ) ,
5396
5437
5397
5438
our_network_key : args. keys_manager . get_node_secret ( ) ,
5398
5439
our_network_pubkey : PublicKey :: from_secret_key ( & secp_ctx, & args. keys_manager . get_node_secret ( ) ) ,
0 commit comments