Skip to content

Commit 024e384

Browse files
committed
Handle MonitorUpdateCompletionActions in monitor update completion
In a previous PR, we added a `MonitorUpdateCompletionAction` enum which described actions to take after a `ChannelMonitorUpdate` persistence completes. At the time, it was only used to execute actions in-line, however in the next commit we'll start (correctly) leaving the existing actions until after monitor updates complete.
1 parent 4f5625b commit 024e384

File tree

1 file changed

+47
-7
lines changed

1 file changed

+47
-7
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,7 @@ enum BackgroundEvent {
445445
ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)),
446446
}
447447

448+
#[derive(Debug)]
448449
pub(crate) enum MonitorUpdateCompletionAction {
449450
/// Indicates that a payment ultimately destined for us was claimed and we should emit an
450451
/// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
@@ -1627,9 +1628,20 @@ macro_rules! handle_new_monitor_update {
16271628
})
16281629
} else { None }
16291630
} else { None };
1630-
let htlc_forwards = $self.handle_channel_resumption(&mut $msg_queue,
1631-
$chan, updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs,
1632-
updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs);
1631+
1632+
let htlc_forwards = {
1633+
let per_peer_state = $self.per_peer_state.read().unwrap();
1634+
let mut peer_state = per_peer_state.get(&counterparty_node_id)
1635+
.expect("XXX: This may be reachable today, I believe, but once we move the channel storage to per_peer_state it won't be.")
1636+
.lock().unwrap();
1637+
1638+
let update_actions = peer_state.monitor_update_blocked_actions
1639+
.remove(&$chan.channel_id()).unwrap_or(Vec::new());
1640+
$self.handle_channel_resumption(&mut $msg_queue, $chan,
1641+
update_actions, updates.raa, updates.commitment_update, updates.order,
1642+
updates.accepted_htlcs, updates.funding_broadcastable,
1643+
updates.channel_ready, updates.announcement_sigs)
1644+
};
16331645
if let Some(upd) = channel_update {
16341646
$msg_queue.push(upd);
16351647
}
@@ -4510,11 +4522,20 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
45104522
/// Handles a channel reentering a functional state, either due to reconnect or a monitor
45114523
/// update completion.
45124524
fn handle_channel_resumption(&self, pending_msg_events: &mut Vec<MessageSendEvent>,
4513-
channel: &mut Channel<<K::Target as KeysInterface>::Signer>, raa: Option<msgs::RevokeAndACK>,
4525+
channel: &mut Channel<<K::Target as KeysInterface>::Signer>,
4526+
actions: Vec<MonitorUpdateCompletionAction>, raa: Option<msgs::RevokeAndACK>,
45144527
commitment_update: Option<msgs::CommitmentUpdate>, order: RAACommitmentOrder,
45154528
pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option<Transaction>,
45164529
channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
45174530
-> Option<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> {
4531+
log_trace!(self.logger, "Handling channel resumption for channel {} with {} actions, {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement",
4532+
log_bytes!(channel.channel_id()), actions.len(),
4533+
if raa.is_some() { "an" } else { "no" },
4534+
if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(),
4535+
if funding_broadcastable.is_some() { "" } else { "not " },
4536+
if channel_ready.is_some() { "sending" } else { "without" },
4537+
if announcement_sigs.is_some() { "sending" } else { "without" });
4538+
45184539
let mut htlc_forwards = None;
45194540

45204541
let counterparty_node_id = channel.get_counterparty_node_id();
@@ -4567,6 +4588,8 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
45674588
self.tx_broadcaster.broadcast_transaction(&tx);
45684589
}
45694590

4591+
self.handle_monitor_update_completion_actions(actions);
4592+
45704593
htlc_forwards
45714594
}
45724595

@@ -4581,11 +4604,22 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
45814604
hash_map::Entry::Occupied(chan) => chan,
45824605
hash_map::Entry::Vacant(_) => return,
45834606
};
4607+
log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}",
4608+
highest_applied_update_id, channel.get().get_latest_monitor_update_id());
45844609
if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
45854610
return;
45864611
}
45874612

45884613
let counterparty_node_id = channel.get().get_counterparty_node_id();
4614+
let per_peer_state = self.per_peer_state.read().unwrap();
4615+
let mut peer_state = match per_peer_state.get(&counterparty_node_id) {
4616+
Some(peer_state) => peer_state.lock().unwrap(),
4617+
None => {
4618+
debug_assert!(false, "XXX: This may be reachable today, I believe, but once we move the channel storage to per_peer_state it won't be.");
4619+
return;
4620+
}
4621+
};
4622+
45894623
let updates = channel.get_mut().monitor_updating_restored(&self.logger, self.get_our_node_id(), self.genesis_hash, self.best_block.read().unwrap().height());
45904624
let channel_update = if updates.channel_ready.is_some() && channel.get().is_usable() {
45914625
// We only send a channel_update in the case where we are just now sending a
@@ -4600,7 +4634,12 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
46004634
})
46014635
} else { None }
46024636
} else { None };
4603-
htlc_forwards = self.handle_channel_resumption(&mut channel_state.pending_msg_events, channel.get_mut(), updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs);
4637+
let update_actions = peer_state.monitor_update_blocked_actions
4638+
.remove(&channel.get().channel_id()).unwrap_or(Vec::new());
4639+
htlc_forwards = self.handle_channel_resumption(&mut channel_state.pending_msg_events,
4640+
channel.get_mut(), update_actions,
4641+
updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs,
4642+
updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs);
46044643
if let Some(upd) = channel_update {
46054644
channel_state.pending_msg_events.push(upd);
46064645
}
@@ -5452,8 +5491,9 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
54525491
}
54535492
let need_lnd_workaround = chan.get_mut().workaround_lnd_bug_4006.take();
54545493
htlc_forwards = self.handle_channel_resumption(
5455-
&mut channel_state.pending_msg_events, chan.get_mut(), responses.raa, responses.commitment_update, responses.order,
5456-
Vec::new(), None, responses.channel_ready, responses.announcement_sigs);
5494+
&mut channel_state.pending_msg_events, chan.get_mut(), Vec::new(),
5495+
responses.raa, responses.commitment_update, responses.order, Vec::new(),
5496+
None, responses.channel_ready, responses.announcement_sigs);
54575497
if let Some(upd) = channel_update {
54585498
channel_state.pending_msg_events.push(upd);
54595499
}

0 commit comments

Comments
 (0)