Skip to content

Commit f833448

Browse files
authored
Merge pull request #2362 from TheBlueMatt/2023-06-unblocked-mons-in-manager
Move in-flight ChannelMonitorUpdates to ChannelManager
2 parents 3973865 + 9c3ad28 commit f833448

File tree

3 files changed

+274
-212
lines changed

3 files changed

+274
-212
lines changed

lightning/src/ln/channel.rs

+52-112
Original file line numberDiff line numberDiff line change
@@ -488,13 +488,13 @@ enum UpdateFulfillFetch {
488488
}
489489

490490
/// The return type of get_update_fulfill_htlc_and_commit.
491-
pub enum UpdateFulfillCommitFetch<'a> {
491+
pub enum UpdateFulfillCommitFetch {
492492
/// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed
493493
/// it in the holding cell, or re-generated the update_fulfill message after the same claim was
494494
/// previously placed in the holding cell (and has since been removed).
495495
NewClaim {
496496
/// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
497-
monitor_update: &'a ChannelMonitorUpdate,
497+
monitor_update: ChannelMonitorUpdate,
498498
/// The value of the HTLC which was claimed, in msat.
499499
htlc_value_msat: u64,
500500
},
@@ -588,17 +588,10 @@ pub(crate) const DISCONNECT_PEER_AWAITING_RESPONSE_TICKS: usize = 2;
588588

589589
struct PendingChannelMonitorUpdate {
590590
update: ChannelMonitorUpdate,
591-
/// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
592-
/// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is
593-
/// blocked on some external event and the [`ChannelManager`] will update us when we're ready.
594-
///
595-
/// [`ChannelManager`]: super::channelmanager::ChannelManager
596-
blocked: bool,
597591
}
598592

599593
impl_writeable_tlv_based!(PendingChannelMonitorUpdate, {
600594
(0, update, required),
601-
(2, blocked, required),
602595
});
603596

604597
/// Contains everything about the channel including state, and various flags.
@@ -869,11 +862,9 @@ pub(super) struct ChannelContext<Signer: ChannelSigner> {
869862
/// [`SignerProvider::derive_channel_signer`].
870863
channel_keys_id: [u8; 32],
871864

872-
/// When we generate [`ChannelMonitorUpdate`]s to persist, they may not be persisted immediately.
873-
/// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
874-
/// completes we still need to be able to complete the persistence. Thus, we have to keep a
875-
/// copy of the [`ChannelMonitorUpdate`] here until it is complete.
876-
pending_monitor_updates: Vec<PendingChannelMonitorUpdate>,
865+
/// If we can't release a [`ChannelMonitorUpdate`] until some external action completes, we
866+
/// store it here and only release it to the `ChannelManager` once it asks for it.
867+
blocked_monitor_updates: Vec<PendingChannelMonitorUpdate>,
877868
}
878869

879870
impl<Signer: ChannelSigner> ChannelContext<Signer> {
@@ -2259,51 +2250,38 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
22592250
}
22602251

22612252
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
2262-
let release_cs_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
2253+
let release_cs_monitor = self.context.blocked_monitor_updates.is_empty();
22632254
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
22642255
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
22652256
// Even if we aren't supposed to let new monitor updates with commitment state
22662257
// updates run, we still need to push the preimage ChannelMonitorUpdateStep no
22672258
// matter what. Sadly, to push a new monitor update which flies before others
22682259
// already queued, we have to insert it into the pending queue and update the
22692260
// update_ids of all the following monitors.
2270-
let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
2261+
if release_cs_monitor && msg.is_some() {
22712262
let mut additional_update = self.build_commitment_no_status_check(logger);
22722263
// build_commitment_no_status_check may bump latest_monitor_id but we want them
22732264
// to be strictly increasing by one, so decrement it here.
22742265
self.context.latest_monitor_update_id = monitor_update.update_id;
22752266
monitor_update.updates.append(&mut additional_update.updates);
2276-
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
2277-
update: monitor_update, blocked: false,
2278-
});
2279-
self.context.pending_monitor_updates.len() - 1
22802267
} else {
2281-
let insert_pos = self.context.pending_monitor_updates.iter().position(|upd| upd.blocked)
2282-
.unwrap_or(self.context.pending_monitor_updates.len());
2283-
let new_mon_id = self.context.pending_monitor_updates.get(insert_pos)
2268+
let new_mon_id = self.context.blocked_monitor_updates.get(0)
22842269
.map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
22852270
monitor_update.update_id = new_mon_id;
2286-
self.context.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
2287-
update: monitor_update, blocked: false,
2288-
});
2289-
for held_update in self.context.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
2271+
for held_update in self.context.blocked_monitor_updates.iter_mut() {
22902272
held_update.update.update_id += 1;
22912273
}
22922274
if msg.is_some() {
22932275
debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set");
22942276
let update = self.build_commitment_no_status_check(logger);
2295-
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
2296-
update, blocked: true,
2277+
self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
2278+
update,
22972279
});
22982280
}
2299-
insert_pos
2300-
};
2301-
self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
2302-
UpdateFulfillCommitFetch::NewClaim {
2303-
monitor_update: &self.context.pending_monitor_updates.get(unblocked_update_pos)
2304-
.expect("We just pushed the monitor update").update,
2305-
htlc_value_msat,
23062281
}
2282+
2283+
self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
2284+
UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, }
23072285
},
23082286
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
23092287
}
@@ -2793,7 +2771,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
27932771
Ok(())
27942772
}
27952773

2796-
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError>
2774+
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<ChannelMonitorUpdate>, ChannelError>
27972775
where L::Target: Logger
27982776
{
27992777
if (self.context.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3017,7 +2995,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
30172995
/// Public version of the below, checking relevant preconditions first.
30182996
/// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
30192997
/// returns `(None, Vec::new())`.
3020-
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
2998+
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
30212999
if self.context.channel_state >= ChannelState::ChannelReady as u32 &&
30223000
(self.context.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
30233001
self.free_holding_cell_htlcs(logger)
@@ -3026,7 +3004,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
30263004

30273005
/// Frees any pending commitment updates in the holding cell, generating the relevant messages
30283006
/// for our counterparty.
3029-
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
3007+
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
30303008
assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
30313009
if self.context.holding_cell_htlc_updates.len() != 0 || self.context.holding_cell_update_fee.is_some() {
30323010
log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.context.holding_cell_htlc_updates.len(),
@@ -3142,7 +3120,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
31423120
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
31433121
/// generating an appropriate error *after* the channel state has been updated based on the
31443122
/// revoke_and_ack message.
3145-
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<&ChannelMonitorUpdate>), ChannelError>
3123+
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<ChannelMonitorUpdate>), ChannelError>
31463124
where L::Target: Logger,
31473125
{
31483126
if (self.context.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3344,8 +3322,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
33443322
}
33453323

33463324
match self.free_holding_cell_htlcs(logger) {
3347-
(Some(_), htlcs_to_fail) => {
3348-
let mut additional_update = self.context.pending_monitor_updates.pop().unwrap().update;
3325+
(Some(mut additional_update), htlcs_to_fail) => {
33493326
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
33503327
// strictly increasing by one, so decrement it here.
33513328
self.context.latest_monitor_update_id = monitor_update.update_id;
@@ -3561,12 +3538,6 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
35613538
{
35623539
assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
35633540
self.context.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
3564-
let mut found_blocked = false;
3565-
self.context.pending_monitor_updates.retain(|upd| {
3566-
if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
3567-
if upd.blocked { found_blocked = true; }
3568-
upd.blocked
3569-
});
35703541

35713542
// If we're past (or at) the FundingSent stage on an outbound channel, try to
35723543
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -4070,7 +4041,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
40704041

40714042
pub fn shutdown<SP: Deref>(
40724043
&mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown
4073-
) -> Result<(Option<msgs::Shutdown>, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
4044+
) -> Result<(Option<msgs::Shutdown>, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
40744045
where SP::Target: SignerProvider
40754046
{
40764047
if self.context.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
@@ -4136,9 +4107,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
41364107
}],
41374108
};
41384109
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
4139-
if self.push_blockable_mon_update(monitor_update) {
4140-
self.context.pending_monitor_updates.last().map(|upd| &upd.update)
4141-
} else { None }
4110+
self.push_ret_blockable_mon_update(monitor_update)
41424111
} else { None };
41434112
let shutdown = if send_shutdown {
41444113
Some(msgs::Shutdown {
@@ -4428,64 +4397,37 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
44284397
(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0
44294398
}
44304399

4431-
pub fn get_latest_complete_monitor_update_id(&self) -> u64 {
4432-
if self.context.pending_monitor_updates.is_empty() { return self.context.get_latest_monitor_update_id(); }
4433-
self.context.pending_monitor_updates[0].update.update_id - 1
4400+
/// Gets the latest [`ChannelMonitorUpdate`] ID which has been released and is in-flight.
4401+
pub fn get_latest_unblocked_monitor_update_id(&self) -> u64 {
4402+
if self.context.blocked_monitor_updates.is_empty() { return self.context.get_latest_monitor_update_id(); }
4403+
self.context.blocked_monitor_updates[0].update.update_id - 1
44344404
}
44354405

44364406
/// Returns the next blocked monitor update, if one exists, and a bool which indicates a
44374407
/// further blocked monitor update exists after the next.
4438-
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(&ChannelMonitorUpdate, bool)> {
4439-
for i in 0..self.context.pending_monitor_updates.len() {
4440-
if self.context.pending_monitor_updates[i].blocked {
4441-
self.context.pending_monitor_updates[i].blocked = false;
4442-
return Some((&self.context.pending_monitor_updates[i].update,
4443-
self.context.pending_monitor_updates.len() > i + 1));
4444-
}
4445-
}
4446-
None
4408+
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> {
4409+
if self.context.blocked_monitor_updates.is_empty() { return None; }
4410+
Some((self.context.blocked_monitor_updates.remove(0).update,
4411+
!self.context.blocked_monitor_updates.is_empty()))
44474412
}
44484413

4449-
/// Pushes a new monitor update into our monitor update queue, returning whether it should be
4450-
/// immediately given to the user for persisting or if it should be held as blocked.
4451-
fn push_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> bool {
4452-
let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
4453-
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
4454-
update, blocked: !release_monitor
4455-
});
4456-
release_monitor
4457-
}
4458-
4459-
/// Pushes a new monitor update into our monitor update queue, returning a reference to it if
4460-
/// it should be immediately given to the user for persisting or `None` if it should be held as
4461-
/// blocked.
4414+
/// Pushes a new monitor update into our monitor update queue, returning it if it should be
4415+
/// immediately given to the user for persisting or `None` if it should be held as blocked.
44624416
fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
4463-
-> Option<&ChannelMonitorUpdate> {
4464-
let release_monitor = self.push_blockable_mon_update(update);
4465-
if release_monitor { self.context.pending_monitor_updates.last().map(|upd| &upd.update) } else { None }
4466-
}
4467-
4468-
pub fn no_monitor_updates_pending(&self) -> bool {
4469-
self.context.pending_monitor_updates.is_empty()
4470-
}
4471-
4472-
pub fn complete_all_mon_updates_through(&mut self, update_id: u64) {
4473-
self.context.pending_monitor_updates.retain(|upd| {
4474-
if upd.update.update_id <= update_id {
4475-
assert!(!upd.blocked, "Completed update must have flown");
4476-
false
4477-
} else { true }
4478-
});
4479-
}
4480-
4481-
pub fn complete_one_mon_update(&mut self, update_id: u64) {
4482-
self.context.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
4417+
-> Option<ChannelMonitorUpdate> {
4418+
let release_monitor = self.context.blocked_monitor_updates.is_empty();
4419+
if !release_monitor {
4420+
self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
4421+
update,
4422+
});
4423+
None
4424+
} else {
4425+
Some(update)
4426+
}
44834427
}
44844428

4485-
/// Returns an iterator over all unblocked monitor updates which have not yet completed.
4486-
pub fn uncompleted_unblocked_mon_updates(&self) -> impl Iterator<Item=&ChannelMonitorUpdate> {
4487-
self.context.pending_monitor_updates.iter()
4488-
.filter_map(|upd| if upd.blocked { None } else { Some(&upd.update) })
4429+
pub fn blocked_monitor_updates_pending(&self) -> usize {
4430+
self.context.blocked_monitor_updates.len()
44894431
}
44904432

44914433
/// Returns true if the channel is awaiting the persistence of the initial ChannelMonitor.
@@ -5297,7 +5239,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
52975239
pub fn send_htlc_and_commit<L: Deref>(
52985240
&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
52995241
onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option<u64>, logger: &L
5300-
) -> Result<Option<&ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
5242+
) -> Result<Option<ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
53015243
let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source,
53025244
onion_routing_packet, false, skimmed_fee_msat, logger);
53035245
if let Err(e) = &send_res { if let ChannelError::Ignore(_) = e {} else { debug_assert!(false, "Sending cannot trigger channel failure"); } }
@@ -5331,7 +5273,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
53315273
/// [`ChannelMonitorUpdate`] will be returned).
53325274
pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures,
53335275
target_feerate_sats_per_kw: Option<u32>, override_shutdown_script: Option<ShutdownScript>)
5334-
-> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
5276+
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
53355277
where SP::Target: SignerProvider {
53365278
for htlc in self.context.pending_outbound_htlcs.iter() {
53375279
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
@@ -5402,9 +5344,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
54025344
}],
54035345
};
54045346
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
5405-
if self.push_blockable_mon_update(monitor_update) {
5406-
self.context.pending_monitor_updates.last().map(|upd| &upd.update)
5407-
} else { None }
5347+
self.push_ret_blockable_mon_update(monitor_update)
54085348
} else { None };
54095349
let shutdown = msgs::Shutdown {
54105350
channel_id: self.context.channel_id,
@@ -5642,7 +5582,7 @@ impl<Signer: WriteableEcdsaChannelSigner> OutboundV1Channel<Signer> {
56425582
channel_type,
56435583
channel_keys_id,
56445584

5645-
pending_monitor_updates: Vec::new(),
5585+
blocked_monitor_updates: Vec::new(),
56465586
}
56475587
})
56485588
}
@@ -6271,7 +6211,7 @@ impl<Signer: WriteableEcdsaChannelSigner> InboundV1Channel<Signer> {
62716211
channel_type,
62726212
channel_keys_id,
62736213

6274-
pending_monitor_updates: Vec::new(),
6214+
blocked_monitor_updates: Vec::new(),
62756215
}
62766216
};
62776217

@@ -6857,7 +6797,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for Channel<Signer> {
68576797
(28, holder_max_accepted_htlcs, option),
68586798
(29, self.context.temporary_channel_id, option),
68596799
(31, channel_pending_event_emitted, option),
6860-
(33, self.context.pending_monitor_updates, vec_type),
6800+
(33, self.context.blocked_monitor_updates, vec_type),
68616801
(35, pending_outbound_skimmed_fees, optional_vec),
68626802
(37, holding_cell_skimmed_fees, optional_vec),
68636803
});
@@ -7138,7 +7078,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
71387078
let mut temporary_channel_id: Option<[u8; 32]> = None;
71397079
let mut holder_max_accepted_htlcs: Option<u16> = None;
71407080

7141-
let mut pending_monitor_updates = Some(Vec::new());
7081+
let mut blocked_monitor_updates = Some(Vec::new());
71427082

71437083
let mut pending_outbound_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
71447084
let mut holding_cell_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
@@ -7165,7 +7105,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
71657105
(28, holder_max_accepted_htlcs, option),
71667106
(29, temporary_channel_id, option),
71677107
(31, channel_pending_event_emitted, option),
7168-
(33, pending_monitor_updates, vec_type),
7108+
(33, blocked_monitor_updates, vec_type),
71697109
(35, pending_outbound_skimmed_fees_opt, optional_vec),
71707110
(37, holding_cell_skimmed_fees_opt, optional_vec),
71717111
});
@@ -7362,7 +7302,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
73627302
channel_type: channel_type.unwrap(),
73637303
channel_keys_id,
73647304

7365-
pending_monitor_updates: pending_monitor_updates.unwrap(),
7305+
blocked_monitor_updates: blocked_monitor_updates.unwrap(),
73667306
}
73677307
})
73687308
}

0 commit comments

Comments
 (0)