@@ -1706,23 +1706,39 @@ macro_rules! process_events_body {
1706
1706
}
1707
1707
1708
1708
let _single_processor = $self. pending_events_processor. lock( ) . unwrap( ) ;
1709
- let mut next_event = $self. pending_events. lock( ) . unwrap( ) . front( ) . map( |ev| ( * ev) . clone( ) ) ;
1710
1709
loop {
1711
- if let Some ( ( event, _action) ) = next_event {
1712
- result = NotifyOption :: DoPersist ;
1713
- let ev_clone;
1714
- #[ cfg( debug_assertions) ] {
1715
- ev_clone = event. clone( ) ;
1710
+ let mut next_event = $self. pending_events. lock( ) . unwrap( ) . front( ) . map( |ev| ( * ev) . clone( ) ) ;
1711
+ let mut post_event_actions = Vec :: new( ) ;
1712
+ loop {
1713
+ if let Some ( ( event, action_opt) ) = next_event {
1714
+ result = NotifyOption :: DoPersist ;
1715
+ let _ev_clone: Event ;
1716
+ #[ cfg( debug_assertions) ] {
1717
+ _ev_clone = event. clone( ) ;
1718
+ }
1719
+ $event_to_handle = event;
1720
+ $handle_event;
1721
+ let mut pending_events = $self. pending_events. lock( ) . unwrap( ) ;
1722
+ // We're required to take the `pending_events_processor` lock any time we
1723
+ // change `pending_events`, however have no general way to enforce that.
1724
+ // Instead, here, we assert that the next event hasn't changed out from under
1725
+ // us.
1726
+ #[ cfg( debug_assertions) ] {
1727
+ debug_assert_eq!( _ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
1728
+ }
1729
+ debug_assert_eq!( action_opt, pending_events. front( ) . unwrap( ) . 1 ) ;
1730
+ if let Some ( action) = action_opt {
1731
+ post_event_actions. push( action) ;
1732
+ }
1733
+ pending_events. pop_front( ) ;
1734
+ next_event = pending_events. front( ) . map( |ev| ev. clone( ) ) ;
1735
+ } else {
1736
+ break ;
1716
1737
}
1717
- $event_to_handle = event;
1718
- $handle_event;
1719
- let mut pending_events = $self. pending_events. lock( ) . unwrap( ) ;
1720
- debug_assert_eq!( ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
1721
- pending_events. pop_front( ) ;
1722
- next_event = pending_events. front( ) . map( |ev| ev. clone( ) ) ;
1723
- } else {
1724
- break ;
1725
1738
}
1739
+ if post_event_actions. is_empty( ) { break ; }
1740
+ $self. handle_post_event_actions( post_event_actions) ;
1741
+ // If we had some actions, go around again as we may have more events now
1726
1742
}
1727
1743
1728
1744
if result == NotifyOption :: DoPersist {
@@ -5860,6 +5876,66 @@ where
5860
5876
self . pending_outbound_payments . clear_pending_payments ( )
5861
5877
}
5862
5878
5879
+ fn handle_monitor_update_release ( & self , counterparty_node_id : PublicKey , channel_funding_outpoint : OutPoint ) {
5880
+ loop {
5881
+ let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5882
+ if let Some ( peer_state_mtx) = per_peer_state. get ( & counterparty_node_id) {
5883
+ let mut peer_state_lck = peer_state_mtx. lock ( ) . unwrap ( ) ;
5884
+ let peer_state = & mut * peer_state_lck;
5885
+ if self . pending_events . lock ( ) . unwrap ( ) . iter ( )
5886
+ . any ( |( _ev, action_opt) | action_opt == & Some ( EventCompletionAction :: ReleaseRAAChannelMonitorUpdate {
5887
+ channel_funding_outpoint, counterparty_node_id
5888
+ } ) )
5889
+ {
5890
+ // Check that, while holding the peer lock, we don't have another event
5891
+ // blocking any monitor updates for this channel. If we do, let those
5892
+ // events be the ones that ultimately release the monitor update(s).
5893
+ log_trace ! ( self . logger, "Delaying monitor unlock for channel {} as another event is pending" ,
5894
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5895
+ return ;
5896
+ }
5897
+ if let hash_map:: Entry :: Occupied ( mut chan) = peer_state. channel_by_id . entry ( channel_funding_outpoint. to_channel_id ( ) ) {
5898
+ debug_assert_eq ! ( chan. get( ) . get_funding_txo( ) . unwrap( ) , channel_funding_outpoint) ;
5899
+ if let Some ( ( monitor_update, further_update_exists) ) = chan. get_mut ( ) . fly_next_unflown_monitor_update ( ) {
5900
+ log_debug ! ( self . logger, "Unlocking monitor updating for channel {} and updating monitor" ,
5901
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5902
+ let update_res = self . chain_monitor . update_channel ( channel_funding_outpoint, monitor_update) ;
5903
+ let update_id = monitor_update. update_id ;
5904
+ let _ = handle_error ! ( self ,
5905
+ handle_new_monitor_update!( self , update_res, update_id,
5906
+ peer_state_lck, peer_state, per_peer_state, chan) ,
5907
+ counterparty_node_id) ;
5908
+ if further_update_exists {
5909
+ // If there are more `ChannelMonitorUpdate`s to process, restart at the
5910
+ // top of the loop.
5911
+ continue ;
5912
+ }
5913
+ } else {
5914
+ log_trace ! ( self . logger, "Unlocked monitor updating for channel {} without monitors to update" ,
5915
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5916
+ }
5917
+ }
5918
+ } else {
5919
+ log_debug ! ( self . logger,
5920
+ "Got a release post-RAA monitor update for peer {} but the channel is gone" ,
5921
+ log_pubkey!( counterparty_node_id) ) ;
5922
+ }
5923
+ break ;
5924
+ }
5925
+ }
5926
+
5927
+ fn handle_post_event_actions ( & self , actions : Vec < EventCompletionAction > ) {
5928
+ for action in actions {
5929
+ match action {
5930
+ EventCompletionAction :: ReleaseRAAChannelMonitorUpdate {
5931
+ channel_funding_outpoint, counterparty_node_id
5932
+ } => {
5933
+ self . handle_monitor_update_release ( counterparty_node_id, channel_funding_outpoint) ;
5934
+ }
5935
+ }
5936
+ }
5937
+ }
5938
+
5863
5939
/// Processes any events asynchronously in the order they were generated since the last call
5864
5940
/// using the given event handler.
5865
5941
///
0 commit comments