Skip to content

Commit 1d94baf

Browse files
committed
Handle EventCompletionActions after events complete
This adds handling of the new `EventCompletionAction`s after `Event`s are handled, letting `ChannelMonitorUpdate`s which were blocked fly after a relevant `Event`.
1 parent e3dc1de commit 1d94baf

File tree

1 file changed

+110
-26
lines changed

1 file changed

+110
-26
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 110 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5737,6 +5737,66 @@ where
57375737
self.pending_outbound_payments.clear_pending_payments()
57385738
}
57395739

5740+
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
5741+
loop {
5742+
let per_peer_state = self.per_peer_state.read().unwrap();
5743+
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
5744+
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
5745+
let peer_state = &mut *peer_state_lck;
5746+
if self.pending_events.lock().unwrap().iter()
5747+
.any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
5748+
channel_funding_outpoint, counterparty_node_id
5749+
}))
5750+
{
5751+
// Check that, while holding the peer lock, we don't have another event
5752+
// blocking any monitor updates for this channel. If we do, let those
5753+
// events be the ones that ultimately release the monitor update(s).
5754+
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",
5755+
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
5756+
return;
5757+
}
5758+
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
5759+
debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
5760+
if let Some((monitor_update, further_update_exists)) = chan.get_mut().fly_next_unflown_monitor_update() {
5761+
log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor",
5762+
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
5763+
let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update);
5764+
let update_id = monitor_update.update_id;
5765+
let _ = handle_error!(self,
5766+
handle_new_monitor_update!(self, update_res, update_id,
5767+
peer_state_lck, peer_state, per_peer_state, chan),
5768+
counterparty_node_id);
5769+
if further_update_exists {
5770+
// If there are more `ChannelMonitorUpdate`s to process, restart at the
5771+
// top of the loop.
5772+
continue;
5773+
}
5774+
} else {
5775+
log_trace!(self.logger, "Unlocked monitor updating for channel {} without monitors to update",
5776+
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
5777+
}
5778+
}
5779+
} else {
5780+
log_debug!(self.logger,
5781+
"Got a release post-RAA monitor update for peer {} but the channel is gone",
5782+
log_pubkey!(counterparty_node_id));
5783+
}
5784+
break;
5785+
}
5786+
}
5787+
5788+
fn handle_post_event_actions(&self, actions: Vec<EventCompletionAction>) {
5789+
for action in actions {
5790+
match action {
5791+
EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
5792+
channel_funding_outpoint, counterparty_node_id
5793+
} => {
5794+
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
5795+
}
5796+
}
5797+
}
5798+
}
5799+
57405800
/// Processes any events asynchronously in the order they were generated since the last call
57415801
/// using the given event handler.
57425802
///
@@ -5757,22 +5817,34 @@ where
57575817
}
57585818

57595819
let _single_processor = self.pending_events_processor.lock().unwrap();
5760-
let mut next_event = self.pending_events.lock().unwrap().front().map(|ev| (*ev).clone());
57615820
loop {
5762-
if let Some((event, _action)) = next_event {
5763-
result = NotifyOption::DoPersist;
5764-
let ev_clone;
5765-
#[cfg(debug_assertions)] {
5766-
ev_clone = event.clone();
5821+
let mut next_event = self.pending_events.lock().unwrap().front().map(|ev| (*ev).clone());
5822+
let mut post_event_actions = Vec::new();
5823+
loop {
5824+
if let Some((event, action_opt)) = next_event {
5825+
result = NotifyOption::DoPersist;
5826+
let _ev_clone: Event;
5827+
#[cfg(debug_assertions)] {
5828+
_ev_clone = event.clone();
5829+
}
5830+
handler(event).await;
5831+
let mut pending_events = self.pending_events.lock().unwrap();
5832+
#[cfg(debug_assertions)] {
5833+
debug_assert_eq!(_ev_clone, pending_events.front().unwrap().0);
5834+
}
5835+
debug_assert_eq!(action_opt, pending_events.front().unwrap().1);
5836+
if let Some(action) = action_opt {
5837+
post_event_actions.push(action);
5838+
}
5839+
pending_events.pop_front();
5840+
next_event = pending_events.front().map(|ev| ev.clone());
5841+
} else {
5842+
break;
57675843
}
5768-
handler(event).await;
5769-
let mut pending_events = self.pending_events.lock().unwrap();
5770-
debug_assert_eq!(ev_clone, pending_events.front().unwrap().0);
5771-
pending_events.pop_front();
5772-
next_event = pending_events.front().map(|ev| ev.clone());
5773-
} else {
5774-
break;
57755844
}
5845+
if post_event_actions.is_empty() { break; }
5846+
self.handle_post_event_actions(post_event_actions);
5847+
// If we had some actions, go around again as we may have more events now
57765848
}
57775849

57785850
if result == NotifyOption::DoPersist {
@@ -5869,22 +5941,34 @@ where
58695941
}
58705942

58715943
let _single_processor = self.pending_events_processor.lock().unwrap();
5872-
let mut next_event = self.pending_events.lock().unwrap().front().map(|ev| (*ev).clone());
58735944
loop {
5874-
if let Some((event, _action)) = next_event {
5875-
result = NotifyOption::DoPersist;
5876-
let ev_clone;
5877-
#[cfg(debug_assertions)] {
5878-
ev_clone = event.clone();
5945+
let mut next_event = self.pending_events.lock().unwrap().front().map(|ev| (*ev).clone());
5946+
let mut post_event_actions = Vec::new();
5947+
loop {
5948+
if let Some((event, action_opt)) = next_event {
5949+
result = NotifyOption::DoPersist;
5950+
let _ev_clone: Event;
5951+
#[cfg(debug_assertions)] {
5952+
_ev_clone = event.clone();
5953+
}
5954+
handler.handle_event(event);
5955+
let mut pending_events = self.pending_events.lock().unwrap();
5956+
#[cfg(debug_assertions)] {
5957+
debug_assert_eq!(_ev_clone, pending_events.front().unwrap().0);
5958+
}
5959+
debug_assert_eq!(action_opt, pending_events.front().unwrap().1);
5960+
if let Some(action) = action_opt {
5961+
post_event_actions.push(action);
5962+
}
5963+
pending_events.pop_front();
5964+
next_event = pending_events.front().map(|ev| ev.clone());
5965+
} else {
5966+
break;
58795967
}
5880-
handler.handle_event(event);
5881-
let mut pending_events = self.pending_events.lock().unwrap();
5882-
debug_assert_eq!(ev_clone, pending_events.front().unwrap().0);
5883-
pending_events.pop_front();
5884-
next_event = pending_events.front().map(|ev| ev.clone());
5885-
} else {
5886-
break;
58875968
}
5969+
if post_event_actions.is_empty() { break; }
5970+
self.handle_post_event_actions(post_event_actions);
5971+
// If we had some actions, go around again as we may have more events now
58885972
}
58895973

58905974
result

0 commit comments

Comments
 (0)