Skip to content

Move in-flight ChannelMonitorUpdates to ChannelManager #2362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
164 changes: 52 additions & 112 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,13 @@ enum UpdateFulfillFetch {
}

/// The return type of get_update_fulfill_htlc_and_commit.
pub enum UpdateFulfillCommitFetch<'a> {
pub enum UpdateFulfillCommitFetch {
/// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed
/// it in the holding cell, or re-generated the update_fulfill message after the same claim was
/// previously placed in the holding cell (and has since been removed).
NewClaim {
/// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
monitor_update: &'a ChannelMonitorUpdate,
monitor_update: ChannelMonitorUpdate,
/// The value of the HTLC which was claimed, in msat.
htlc_value_msat: u64,
},
Expand Down Expand Up @@ -588,17 +588,10 @@ pub(crate) const DISCONNECT_PEER_AWAITING_RESPONSE_TICKS: usize = 2;

struct PendingChannelMonitorUpdate {
update: ChannelMonitorUpdate,
/// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
/// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is
/// blocked on some external event and the [`ChannelManager`] will update us when we're ready.
///
/// [`ChannelManager`]: super::channelmanager::ChannelManager
blocked: bool,
}

impl_writeable_tlv_based!(PendingChannelMonitorUpdate, {
(0, update, required),
(2, blocked, required),
});

/// Contains everything about the channel including state, and various flags.
Expand Down Expand Up @@ -869,11 +862,9 @@ pub(super) struct ChannelContext<Signer: ChannelSigner> {
/// [`SignerProvider::derive_channel_signer`].
channel_keys_id: [u8; 32],

/// When we generate [`ChannelMonitorUpdate`]s to persist, they may not be persisted immediately.
/// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
/// completes we still need to be able to complete the persistence. Thus, we have to keep a
/// copy of the [`ChannelMonitorUpdate`] here until it is complete.
pending_monitor_updates: Vec<PendingChannelMonitorUpdate>,
/// If we can't release a [`ChannelMonitorUpdate`] until some external action completes, we
/// store it here and only release it to the `ChannelManager` once it asks for it.
blocked_monitor_updates: Vec<PendingChannelMonitorUpdate>,
}

impl<Signer: ChannelSigner> ChannelContext<Signer> {
Expand Down Expand Up @@ -2264,51 +2255,38 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}

pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
let release_cs_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
let release_cs_monitor = self.context.blocked_monitor_updates.is_empty();
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
// Even if we aren't supposed to let new monitor updates with commitment state
// updates run, we still need to push the preimage ChannelMonitorUpdateStep no
// matter what. Sadly, to push a new monitor update which flies before others
// already queued, we have to insert it into the pending queue and update the
// update_ids of all the following monitors.
let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
if release_cs_monitor && msg.is_some() {
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them
// to be strictly increasing by one, so decrement it here.
self.context.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update: monitor_update, blocked: false,
});
self.context.pending_monitor_updates.len() - 1
} else {
let insert_pos = self.context.pending_monitor_updates.iter().position(|upd| upd.blocked)
.unwrap_or(self.context.pending_monitor_updates.len());
let new_mon_id = self.context.pending_monitor_updates.get(insert_pos)
let new_mon_id = self.context.blocked_monitor_updates.get(0)
.map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
monitor_update.update_id = new_mon_id;
self.context.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
update: monitor_update, blocked: false,
});
for held_update in self.context.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
for held_update in self.context.blocked_monitor_updates.iter_mut() {
held_update.update.update_id += 1;
}
if msg.is_some() {
debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set");
let update = self.build_commitment_no_status_check(logger);
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update, blocked: true,
self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
update,
});
}
insert_pos
};
self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
UpdateFulfillCommitFetch::NewClaim {
monitor_update: &self.context.pending_monitor_updates.get(unblocked_update_pos)
.expect("We just pushed the monitor update").update,
htlc_value_msat,
}

self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, }
},
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
}
Expand Down Expand Up @@ -2798,7 +2776,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
Ok(())
}

pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError>
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<ChannelMonitorUpdate>, ChannelError>
where L::Target: Logger
{
if (self.context.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
Expand Down Expand Up @@ -3022,7 +3000,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
/// Public version of the below, checking relevant preconditions first.
/// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
/// returns `(None, Vec::new())`.
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
if self.context.channel_state >= ChannelState::ChannelReady as u32 &&
(self.context.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
self.free_holding_cell_htlcs(logger)
Expand All @@ -3031,7 +3009,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {

/// Frees any pending commitment updates in the holding cell, generating the relevant messages
/// for our counterparty.
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
if self.context.holding_cell_htlc_updates.len() != 0 || self.context.holding_cell_update_fee.is_some() {
log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.context.holding_cell_htlc_updates.len(),
Expand Down Expand Up @@ -3147,7 +3125,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
/// generating an appropriate error *after* the channel state has been updated based on the
/// revoke_and_ack message.
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<&ChannelMonitorUpdate>), ChannelError>
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<ChannelMonitorUpdate>), ChannelError>
where L::Target: Logger,
{
if (self.context.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
Expand Down Expand Up @@ -3349,8 +3327,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}

match self.free_holding_cell_htlcs(logger) {
(Some(_), htlcs_to_fail) => {
let mut additional_update = self.context.pending_monitor_updates.pop().unwrap().update;
(Some(mut additional_update), htlcs_to_fail) => {
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
// strictly increasing by one, so decrement it here.
self.context.latest_monitor_update_id = monitor_update.update_id;
Expand Down Expand Up @@ -3566,12 +3543,6 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
{
assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
self.context.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
let mut found_blocked = false;
self.context.pending_monitor_updates.retain(|upd| {
if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
if upd.blocked { found_blocked = true; }
upd.blocked
});

// If we're past (or at) the FundingSent stage on an outbound channel, try to
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
Expand Down Expand Up @@ -4075,7 +4046,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {

pub fn shutdown<SP: Deref>(
&mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown
) -> Result<(Option<msgs::Shutdown>, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
) -> Result<(Option<msgs::Shutdown>, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
where SP::Target: SignerProvider
{
if self.context.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
Expand Down Expand Up @@ -4141,9 +4112,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}],
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
if self.push_blockable_mon_update(monitor_update) {
self.context.pending_monitor_updates.last().map(|upd| &upd.update)
} else { None }
self.push_ret_blockable_mon_update(monitor_update)
} else { None };
let shutdown = if send_shutdown {
Some(msgs::Shutdown {
Expand Down Expand Up @@ -4433,64 +4402,37 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0
}

pub fn get_latest_complete_monitor_update_id(&self) -> u64 {
if self.context.pending_monitor_updates.is_empty() { return self.context.get_latest_monitor_update_id(); }
self.context.pending_monitor_updates[0].update.update_id - 1
/// Gets the latest [`ChannelMonitorUpdate`] ID which has been released and is in-flight.
pub fn get_latest_unblocked_monitor_update_id(&self) -> u64 {
if self.context.blocked_monitor_updates.is_empty() { return self.context.get_latest_monitor_update_id(); }
self.context.blocked_monitor_updates[0].update.update_id - 1
}

/// Returns the next blocked monitor update, if one exists, and a bool which indicates a
/// further blocked monitor update exists after the next.
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(&ChannelMonitorUpdate, bool)> {
for i in 0..self.context.pending_monitor_updates.len() {
if self.context.pending_monitor_updates[i].blocked {
self.context.pending_monitor_updates[i].blocked = false;
return Some((&self.context.pending_monitor_updates[i].update,
self.context.pending_monitor_updates.len() > i + 1));
}
}
None
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> {
if self.context.blocked_monitor_updates.is_empty() { return None; }
Some((self.context.blocked_monitor_updates.remove(0).update,
!self.context.blocked_monitor_updates.is_empty()))
}

/// Pushes a new monitor update into our monitor update queue, returning whether it should be
/// immediately given to the user for persisting or if it should be held as blocked.
fn push_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> bool {
let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update, blocked: !release_monitor
});
release_monitor
}

/// Pushes a new monitor update into our monitor update queue, returning a reference to it if
/// it should be immediately given to the user for persisting or `None` if it should be held as
/// blocked.
/// Pushes a new monitor update into our monitor update queue, returning it if it should be
/// immediately given to the user for persisting or `None` if it should be held as blocked.
fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
-> Option<&ChannelMonitorUpdate> {
let release_monitor = self.push_blockable_mon_update(update);
if release_monitor { self.context.pending_monitor_updates.last().map(|upd| &upd.update) } else { None }
}

pub fn no_monitor_updates_pending(&self) -> bool {
self.context.pending_monitor_updates.is_empty()
}

pub fn complete_all_mon_updates_through(&mut self, update_id: u64) {
self.context.pending_monitor_updates.retain(|upd| {
if upd.update.update_id <= update_id {
assert!(!upd.blocked, "Completed update must have flown");
false
} else { true }
});
}

pub fn complete_one_mon_update(&mut self, update_id: u64) {
self.context.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
-> Option<ChannelMonitorUpdate> {
let release_monitor = self.context.blocked_monitor_updates.is_empty();
if !release_monitor {
self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
update,
});
None
} else {
Some(update)
}
}

/// Returns an iterator over all unblocked monitor updates which have not yet completed.
pub fn uncompleted_unblocked_mon_updates(&self) -> impl Iterator<Item=&ChannelMonitorUpdate> {
self.context.pending_monitor_updates.iter()
.filter_map(|upd| if upd.blocked { None } else { Some(&upd.update) })
pub fn blocked_monitor_updates_pending(&self) -> usize {
self.context.blocked_monitor_updates.len()
}

/// Returns true if the channel is awaiting the persistence of the initial ChannelMonitor.
Expand Down Expand Up @@ -5302,7 +5244,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
pub fn send_htlc_and_commit<L: Deref>(
&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option<u64>, logger: &L
) -> Result<Option<&ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
) -> Result<Option<ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source,
onion_routing_packet, false, skimmed_fee_msat, logger);
if let Err(e) = &send_res { if let ChannelError::Ignore(_) = e {} else { debug_assert!(false, "Sending cannot trigger channel failure"); } }
Expand Down Expand Up @@ -5336,7 +5278,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
/// [`ChannelMonitorUpdate`] will be returned).
pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures,
target_feerate_sats_per_kw: Option<u32>, override_shutdown_script: Option<ShutdownScript>)
-> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
where SP::Target: SignerProvider {
for htlc in self.context.pending_outbound_htlcs.iter() {
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
Expand Down Expand Up @@ -5407,9 +5349,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}],
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
if self.push_blockable_mon_update(monitor_update) {
self.context.pending_monitor_updates.last().map(|upd| &upd.update)
} else { None }
self.push_ret_blockable_mon_update(monitor_update)
} else { None };
let shutdown = msgs::Shutdown {
channel_id: self.context.channel_id,
Expand Down Expand Up @@ -5648,7 +5588,7 @@ impl<Signer: WriteableEcdsaChannelSigner> OutboundV1Channel<Signer> {
channel_type,
channel_keys_id,

pending_monitor_updates: Vec::new(),
blocked_monitor_updates: Vec::new(),
}
})
}
Expand Down Expand Up @@ -6278,7 +6218,7 @@ impl<Signer: WriteableEcdsaChannelSigner> InboundV1Channel<Signer> {
channel_type,
channel_keys_id,

pending_monitor_updates: Vec::new(),
blocked_monitor_updates: Vec::new(),
}
};

Expand Down Expand Up @@ -6864,7 +6804,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for Channel<Signer> {
(28, holder_max_accepted_htlcs, option),
(29, self.context.temporary_channel_id, option),
(31, channel_pending_event_emitted, option),
(33, self.context.pending_monitor_updates, vec_type),
(33, self.context.blocked_monitor_updates, vec_type),
(35, pending_outbound_skimmed_fees, optional_vec),
(37, holding_cell_skimmed_fees, optional_vec),
});
Expand Down Expand Up @@ -7145,7 +7085,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
let mut temporary_channel_id: Option<[u8; 32]> = None;
let mut holder_max_accepted_htlcs: Option<u16> = None;

let mut pending_monitor_updates = Some(Vec::new());
let mut blocked_monitor_updates = Some(Vec::new());

let mut pending_outbound_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
let mut holding_cell_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
Expand All @@ -7172,7 +7112,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
(28, holder_max_accepted_htlcs, option),
(29, temporary_channel_id, option),
(31, channel_pending_event_emitted, option),
(33, pending_monitor_updates, vec_type),
(33, blocked_monitor_updates, vec_type),
(35, pending_outbound_skimmed_fees_opt, optional_vec),
(37, holding_cell_skimmed_fees_opt, optional_vec),
});
Expand Down Expand Up @@ -7365,7 +7305,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
channel_type: channel_type.unwrap(),
channel_keys_id,

pending_monitor_updates: pending_monitor_updates.unwrap(),
blocked_monitor_updates: blocked_monitor_updates.unwrap(),
}
})
}
Expand Down
Loading