@@ -479,6 +479,17 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4;
479
479
/// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval
480
480
pub ( crate ) const EXPIRE_PREV_CONFIG_TICKS : usize = 5 ;
481
481
482
+ struct PendingChannelMonitorUpdate {
483
+ update : ChannelMonitorUpdate ,
484
+ /// In some cases we need to delay letting the [`ChannelMonitorUpdate`] fly until after an
485
+ /// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] has
486
+ /// flown and we're waiting to hear back, otherwise the update is waiting on some external
487
+ /// event and the [`ChannelManager`] will update us when we're ready.
488
+ ///
489
+ /// [`ChannelManager`]: super::channelmanager::ChannelManager
490
+ flown : bool ,
491
+ }
492
+
482
493
// TODO: We should refactor this to be an Inbound/OutboundChannel until initial setup handshaking
483
494
// has been completed, and then turn into a Channel to get compiler-time enforcement of things like
484
495
// calling channel_id() before we're set up or things like get_outbound_funding_signed on an
@@ -740,7 +751,7 @@ pub(super) struct Channel<Signer: ChannelSigner> {
740
751
/// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
741
752
/// completes we still need to be able to complete the persistence. Thus, we have to keep a
742
753
/// copy of the [`ChannelMonitorUpdate`] here until it is complete.
743
- pending_monitor_updates : Vec < ChannelMonitorUpdate > ,
754
+ pending_monitor_updates : Vec < PendingChannelMonitorUpdate > ,
744
755
}
745
756
746
757
#[ cfg( any( test, fuzzing) ) ]
@@ -1967,28 +1978,52 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
1967
1978
}
1968
1979
1969
1980
pub fn get_update_fulfill_htlc_and_commit < L : Deref > ( & mut self , htlc_id : u64 , payment_preimage : PaymentPreimage , logger : & L ) -> UpdateFulfillCommitFetch where L :: Target : Logger {
1981
+ let fly_cs_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
1970
1982
match self . get_update_fulfill_htlc ( htlc_id, payment_preimage, logger) {
1971
- UpdateFulfillFetch :: NewClaim { mut monitor_update, htlc_value_msat, msg : Some ( _) } => {
1972
- let mut additional_update = self . build_commitment_no_status_check ( logger) ;
1973
- // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
1974
- // strictly increasing by one, so decrement it here.
1975
- self . latest_monitor_update_id = monitor_update. update_id ;
1976
- monitor_update. updates . append ( & mut additional_update. updates ) ;
1977
- self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
1978
- self . pending_monitor_updates . push ( monitor_update) ;
1983
+ UpdateFulfillFetch :: NewClaim { mut monitor_update, htlc_value_msat, msg } => {
1984
+ // Even if we aren't supposed to let new monitor updates with commitment state
1985
+ // updates fly, we still need to push the preimage ChannelMonitorUpdateStep no
1986
+ // matter what. Sadly, to push a new monitor update which flies before others
1987
+ // already queued, we have to insert it into the pending queue and update the
1988
+ // update_ids of all the following monitors.
1989
+ let flown_monitor_pos = if fly_cs_monitor && msg. is_some ( ) {
1990
+ // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
1991
+ // strictly increasing by one, so decrement it here.
1992
+ let mut additional_update = self . build_commitment_no_status_check ( logger) ;
1993
+ self . latest_monitor_update_id = monitor_update. update_id ;
1994
+ monitor_update. updates . append ( & mut additional_update. updates ) ;
1995
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
1996
+ update : monitor_update, flown : true ,
1997
+ } ) ;
1998
+ self . pending_monitor_updates . len ( ) - 1
1999
+ } else {
2000
+ let insert_pos = self . pending_monitor_updates . iter ( ) . position ( |upd| !upd. flown )
2001
+ . unwrap_or ( self . pending_monitor_updates . len ( ) ) ;
2002
+ let new_mon_id = self . pending_monitor_updates . get ( insert_pos)
2003
+ . map ( |upd| upd. update . update_id ) . unwrap_or ( monitor_update. update_id ) ;
2004
+ monitor_update. update_id = new_mon_id;
2005
+ self . pending_monitor_updates . insert ( insert_pos, PendingChannelMonitorUpdate {
2006
+ update : monitor_update, flown : true ,
2007
+ } ) ;
2008
+ for held_update in self . pending_monitor_updates . iter_mut ( ) . skip ( insert_pos + 1 ) {
2009
+ held_update. update . update_id += 1 ;
2010
+ }
2011
+ if msg. is_some ( ) {
2012
+ debug_assert ! ( false , "If there is a pending unflown monitor we should have AwaitingMonitorUpdate set" ) ;
2013
+ let update = self . build_commitment_no_status_check ( logger) ;
2014
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
2015
+ update, flown : false ,
2016
+ } ) ;
2017
+ }
2018
+ insert_pos
2019
+ } ;
2020
+ self . monitor_updating_paused ( false , msg. is_some ( ) , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
1979
2021
UpdateFulfillCommitFetch :: NewClaim {
1980
- monitor_update : self . pending_monitor_updates . last ( ) . unwrap ( ) ,
2022
+ monitor_update : & self . pending_monitor_updates . get ( flown_monitor_pos)
2023
+ . expect ( "We just pushed the monitor update" ) . update ,
1981
2024
htlc_value_msat,
1982
2025
}
1983
2026
} ,
1984
- UpdateFulfillFetch :: NewClaim { monitor_update, htlc_value_msat, msg : None } => {
1985
- self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
1986
- self . pending_monitor_updates . push ( monitor_update) ;
1987
- UpdateFulfillCommitFetch :: NewClaim {
1988
- monitor_update : self . pending_monitor_updates . last ( ) . unwrap ( ) ,
1989
- htlc_value_msat,
1990
- }
1991
- }
1992
2027
UpdateFulfillFetch :: DuplicateClaim { } => UpdateFulfillCommitFetch :: DuplicateClaim { } ,
1993
2028
}
1994
2029
}
@@ -3054,7 +3089,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3054
3089
Ok ( ( ) )
3055
3090
}
3056
3091
3057
- pub fn commitment_signed < L : Deref > ( & mut self , msg : & msgs:: CommitmentSigned , logger : & L ) -> Result < & ChannelMonitorUpdate , ChannelError >
3092
+ pub fn commitment_signed < L : Deref > ( & mut self , msg : & msgs:: CommitmentSigned , logger : & L ) -> Result < Option < & ChannelMonitorUpdate > , ChannelError >
3058
3093
where L :: Target : Logger
3059
3094
{
3060
3095
if ( self . channel_state & ( ChannelState :: ChannelReady as u32 ) ) != ( ChannelState :: ChannelReady as u32 ) {
@@ -3230,8 +3265,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3230
3265
}
3231
3266
log_debug ! ( logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply." ,
3232
3267
log_bytes!( self . channel_id) ) ;
3233
- self . pending_monitor_updates . push ( monitor_update) ;
3234
- return Ok ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) ;
3268
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3269
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3270
+ update : monitor_update, flown : fly_monitor
3271
+ } ) ;
3272
+ return Ok ( if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) ;
3235
3273
}
3236
3274
3237
3275
let need_commitment_signed = if need_commitment && ( self . channel_state & ( ChannelState :: AwaitingRemoteRevoke as u32 ) ) == 0 {
@@ -3248,9 +3286,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3248
3286
3249
3287
log_debug ! ( logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack." ,
3250
3288
log_bytes!( self . channel_id( ) ) , if need_commitment_signed { " our own commitment_signed and" } else { "" } ) ;
3251
- self . pending_monitor_updates . push ( monitor_update) ;
3289
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3290
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3291
+ update : monitor_update, flown : fly_monitor,
3292
+ } ) ;
3252
3293
self . monitor_updating_paused ( true , need_commitment_signed, false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
3253
- return Ok ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) ;
3294
+ return Ok ( if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd . update ) } else { None } ) ;
3254
3295
}
3255
3296
3256
3297
/// Public version of the below, checking relevant preconditions first.
@@ -3365,8 +3406,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3365
3406
update_add_htlcs. len( ) , update_fulfill_htlcs. len( ) , update_fail_htlcs. len( ) ) ;
3366
3407
3367
3408
self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
3368
- self . pending_monitor_updates . push ( monitor_update) ;
3369
- ( Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) , htlcs_to_fail)
3409
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3410
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3411
+ update : monitor_update, flown : fly_monitor,
3412
+ } ) ;
3413
+ ( if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ,
3414
+ htlcs_to_fail)
3370
3415
} else {
3371
3416
( None , Vec :: new ( ) )
3372
3417
}
@@ -3377,7 +3422,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3377
3422
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
3378
3423
/// generating an appropriate error *after* the channel state has been updated based on the
3379
3424
/// revoke_and_ack message.
3380
- pub fn revoke_and_ack < L : Deref > ( & mut self , msg : & msgs:: RevokeAndACK , logger : & L ) -> Result < ( Vec < ( HTLCSource , PaymentHash ) > , & ChannelMonitorUpdate ) , ChannelError >
3425
+ pub fn revoke_and_ack < L : Deref > ( & mut self , msg : & msgs:: RevokeAndACK , logger : & L ) -> Result < ( Vec < ( HTLCSource , PaymentHash ) > , Option < & ChannelMonitorUpdate > ) , ChannelError >
3381
3426
where L :: Target : Logger ,
3382
3427
{
3383
3428
if ( self . channel_state & ( ChannelState :: ChannelReady as u32 ) ) != ( ChannelState :: ChannelReady as u32 ) {
@@ -3574,21 +3619,29 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3574
3619
self . monitor_pending_failures . append ( & mut revoked_htlcs) ;
3575
3620
self . monitor_pending_finalized_fulfills . append ( & mut finalized_claimed_htlcs) ;
3576
3621
log_debug ! ( logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply." , log_bytes!( self . channel_id( ) ) ) ;
3577
- self . pending_monitor_updates . push ( monitor_update) ;
3578
- return Ok ( ( Vec :: new ( ) , self . pending_monitor_updates . last ( ) . unwrap ( ) ) ) ;
3622
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3623
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3624
+ update : monitor_update, flown : fly_monitor,
3625
+ } ) ;
3626
+ return Ok ( ( Vec :: new ( ) ,
3627
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) ) ;
3579
3628
}
3580
3629
3581
3630
match self . free_holding_cell_htlcs ( logger) {
3582
3631
( Some ( _) , htlcs_to_fail) => {
3583
- let mut additional_update = self . pending_monitor_updates . pop ( ) . unwrap ( ) ;
3632
+ let mut additional_update = self . pending_monitor_updates . pop ( ) . unwrap ( ) . update ;
3584
3633
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
3585
3634
// strictly increasing by one, so decrement it here.
3586
3635
self . latest_monitor_update_id = monitor_update. update_id ;
3587
3636
monitor_update. updates . append ( & mut additional_update. updates ) ;
3588
3637
3589
3638
self . monitor_updating_paused ( false , true , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3590
- self . pending_monitor_updates . push ( monitor_update) ;
3591
- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3639
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3640
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3641
+ update : monitor_update, flown : fly_monitor,
3642
+ } ) ;
3643
+ Ok ( ( htlcs_to_fail,
3644
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) )
3592
3645
} ,
3593
3646
( None , htlcs_to_fail) => {
3594
3647
if require_commitment {
@@ -3602,13 +3655,21 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3602
3655
log_debug ! ( logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed." ,
3603
3656
log_bytes!( self . channel_id( ) ) , update_fail_htlcs. len( ) + update_fail_malformed_htlcs. len( ) ) ;
3604
3657
self . monitor_updating_paused ( false , true , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3605
- self . pending_monitor_updates . push ( monitor_update) ;
3606
- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3658
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3659
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3660
+ update : monitor_update, flown : fly_monitor,
3661
+ } ) ;
3662
+ Ok ( ( htlcs_to_fail,
3663
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) )
3607
3664
} else {
3608
3665
log_debug ! ( logger, "Received a valid revoke_and_ack for channel {} with no reply necessary." , log_bytes!( self . channel_id( ) ) ) ;
3609
3666
self . monitor_updating_paused ( false , false , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3610
- self . pending_monitor_updates . push ( monitor_update) ;
3611
- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3667
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
3668
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
3669
+ update : monitor_update, flown : fly_monitor,
3670
+ } ) ;
3671
+ Ok ( ( htlcs_to_fail,
3672
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } ) )
3612
3673
}
3613
3674
}
3614
3675
}
@@ -3797,7 +3858,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
3797
3858
{
3798
3859
assert_eq ! ( self . channel_state & ChannelState :: MonitorUpdateInProgress as u32 , ChannelState :: MonitorUpdateInProgress as u32 ) ;
3799
3860
self . channel_state &= !( ChannelState :: MonitorUpdateInProgress as u32 ) ;
3800
- self . pending_monitor_updates . clear ( ) ;
3861
+ let mut found_unflown = false ;
3862
+ self . pending_monitor_updates . retain ( |upd| {
3863
+ if found_unflown { debug_assert ! ( !upd. flown, "No mons may fly after one is paused" ) ; }
3864
+ if !upd. flown { found_unflown = true ; }
3865
+ !upd. flown
3866
+ } ) ;
3801
3867
3802
3868
// If we're past (or at) the FundingSent stage on an outbound channel, try to
3803
3869
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -4338,8 +4404,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
4338
4404
} ] ,
4339
4405
} ;
4340
4406
self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
4341
- self . pending_monitor_updates . push ( monitor_update) ;
4342
- Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) )
4407
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
4408
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
4409
+ update : monitor_update, flown : fly_monitor,
4410
+ } ) ;
4411
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None }
4343
4412
} else { None } ;
4344
4413
let shutdown = if send_shutdown {
4345
4414
Some ( msgs:: Shutdown {
@@ -4889,8 +4958,25 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
4889
4958
( self . channel_state & ChannelState :: MonitorUpdateInProgress as u32 ) != 0
4890
4959
}
4891
4960
4892
- pub fn get_next_monitor_update ( & self ) -> Option < & ChannelMonitorUpdate > {
4893
- self . pending_monitor_updates . first ( )
4961
+ /// Returns the next unflown monitor update, if one exists, and a bool which indicates a
4962
+ /// further unflown monitor update exists after the next.
4963
+ pub fn fly_next_unflown_monitor_update ( & mut self ) -> Option < ( & ChannelMonitorUpdate , bool ) > {
4964
+ for i in 0 ..self . pending_monitor_updates . len ( ) {
4965
+ if !self . pending_monitor_updates [ i] . flown {
4966
+ self . pending_monitor_updates [ i] . flown = true ;
4967
+ return Some ( ( & self . pending_monitor_updates [ i] . update ,
4968
+ self . pending_monitor_updates . len ( ) > i + 1 ) ) ;
4969
+ }
4970
+ }
4971
+ None
4972
+ }
4973
+
4974
+ pub fn no_monitor_updates_pending ( & self ) -> bool {
4975
+ self . pending_monitor_updates . is_empty ( )
4976
+ }
4977
+
4978
+ pub fn complete_one_mon_update ( & mut self , update_id : u64 ) {
4979
+ self . pending_monitor_updates . retain ( |upd| upd. update . update_id != update_id) ;
4894
4980
}
4895
4981
4896
4982
/// Returns true if funding_created was sent/received.
@@ -5930,8 +6016,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
5930
6016
Some ( _) => {
5931
6017
let monitor_update = self . build_commitment_no_status_check ( logger) ;
5932
6018
self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
5933
- self . pending_monitor_updates . push ( monitor_update) ;
5934
- Ok ( Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
6019
+
6020
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
6021
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
6022
+ update : monitor_update, flown : fly_monitor,
6023
+ } ) ;
6024
+ Ok ( if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None } )
5935
6025
} ,
5936
6026
None => Ok ( None )
5937
6027
}
@@ -6020,8 +6110,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
6020
6110
} ] ,
6021
6111
} ;
6022
6112
self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
6023
- self . pending_monitor_updates . push ( monitor_update) ;
6024
- Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) )
6113
+ let fly_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| upd. flown ) ;
6114
+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
6115
+ update : monitor_update, flown : fly_monitor,
6116
+ } ) ;
6117
+ if fly_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None }
6025
6118
} else { None } ;
6026
6119
let shutdown = msgs:: Shutdown {
6027
6120
channel_id : self . channel_id ,
0 commit comments