Skip to content

Commit 4041f08

Browse files
committed
Move in-flight ChannelMonitorUpdates to ChannelManager
Because `ChannelMonitorUpdate`s can be generated for a channel which is already closed, and must still be tracked through their completion, storing them in a `Channel` doesn't make sense - we'd have to have a redundant place to put them post-closure and handle both storage locations equivalently. Instead, here, we move to storing in-flight `ChannelMonitorUpdate`s to the `ChannelManager`, leaving blocked `ChannelMonitorUpdate`s in the `Channel` as they were.
1 parent 1c7b692 commit 4041f08

File tree

3 files changed

+202
-112
lines changed

3 files changed

+202
-112
lines changed

lightning/src/ln/channel.rs

+27-61
Original file line numberDiff line numberDiff line change
@@ -2264,34 +2264,25 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
22642264
}
22652265

22662266
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
2267-
let release_cs_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
2267+
let release_cs_monitor = self.context.pending_monitor_updates.is_empty();
22682268
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
22692269
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
22702270
// Even if we aren't supposed to let new monitor updates with commitment state
22712271
// updates run, we still need to push the preimage ChannelMonitorUpdateStep no
22722272
// matter what. Sadly, to push a new monitor update which flies before others
22732273
// already queued, we have to insert it into the pending queue and update the
22742274
// update_ids of all the following monitors.
2275-
let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
2275+
if release_cs_monitor && msg.is_some() {
22762276
let mut additional_update = self.build_commitment_no_status_check(logger);
22772277
// build_commitment_no_status_check may bump latest_monitor_id but we want them
22782278
// to be strictly increasing by one, so decrement it here.
22792279
self.context.latest_monitor_update_id = monitor_update.update_id;
22802280
monitor_update.updates.append(&mut additional_update.updates);
2281-
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
2282-
update: monitor_update, blocked: false,
2283-
});
2284-
self.context.pending_monitor_updates.len() - 1
22852281
} else {
2286-
let insert_pos = self.context.pending_monitor_updates.iter().position(|upd| upd.blocked)
2287-
.unwrap_or(self.context.pending_monitor_updates.len());
2288-
let new_mon_id = self.context.pending_monitor_updates.get(insert_pos)
2282+
let new_mon_id = self.context.pending_monitor_updates.get(0)
22892283
.map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
22902284
monitor_update.update_id = new_mon_id;
2291-
self.context.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
2292-
update: monitor_update, blocked: false,
2293-
});
2294-
for held_update in self.context.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
2285+
for held_update in self.context.pending_monitor_updates.iter_mut() {
22952286
held_update.update.update_id += 1;
22962287
}
22972288
if msg.is_some() {
@@ -2301,14 +2292,10 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
23012292
update, blocked: true,
23022293
});
23032294
}
2304-
insert_pos
2305-
};
2306-
self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
2307-
UpdateFulfillCommitFetch::NewClaim {
2308-
monitor_update: self.context.pending_monitor_updates.get(unblocked_update_pos)
2309-
.expect("We just pushed the monitor update").update.clone(),
2310-
htlc_value_msat,
23112295
}
2296+
2297+
self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
2298+
UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, }
23122299
},
23132300
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
23142301
}
@@ -3349,8 +3336,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
33493336
}
33503337

33513338
match self.free_holding_cell_htlcs(logger) {
3352-
(Some(_), htlcs_to_fail) => {
3353-
let mut additional_update = self.context.pending_monitor_updates.pop().unwrap().update;
3339+
(Some(mut additional_update), htlcs_to_fail) => {
33543340
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
33553341
// strictly increasing by one, so decrement it here.
33563342
self.context.latest_monitor_update_id = monitor_update.update_id;
@@ -3566,12 +3552,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
35663552
{
35673553
assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
35683554
self.context.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
3569-
let mut found_blocked = false;
3570-
self.context.pending_monitor_updates.retain(|upd| {
3571-
if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
3572-
if upd.blocked { found_blocked = true; }
3573-
upd.blocked
3574-
});
3555+
for upd in self.context.pending_monitor_updates.iter() {
3556+
debug_assert!(upd.blocked);
3557+
}
35753558

35763559
// If we're past (or at) the FundingSent stage on an outbound channel, try to
35773560
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -4439,48 +4422,31 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
44394422
/// Returns the next blocked monitor update, if one exists, and a bool which indicates a
44404423
/// further blocked monitor update exists after the next.
44414424
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> {
4442-
for i in 0..self.context.pending_monitor_updates.len() {
4443-
if self.context.pending_monitor_updates[i].blocked {
4444-
self.context.pending_monitor_updates[i].blocked = false;
4445-
return Some((self.context.pending_monitor_updates[i].update.clone(),
4446-
self.context.pending_monitor_updates.len() > i + 1));
4447-
}
4425+
for upd in self.context.pending_monitor_updates.iter() {
4426+
debug_assert!(upd.blocked);
44484427
}
4449-
None
4428+
if self.context.pending_monitor_updates.is_empty() { return None; }
4429+
Some((self.context.pending_monitor_updates.remove(0).update,
4430+
!self.context.pending_monitor_updates.is_empty()))
44504431
}
44514432

44524433
/// Pushes a new monitor update into our monitor update queue, returning it if it should be
44534434
/// immediately given to the user for persisting or `None` if it should be held as blocked.
44544435
fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
44554436
-> Option<ChannelMonitorUpdate> {
4456-
let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
4457-
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
4458-
update, blocked: !release_monitor,
4459-
});
4460-
if release_monitor { self.context.pending_monitor_updates.last().map(|upd| upd.update.clone()) } else { None }
4461-
}
4462-
4463-
pub fn no_monitor_updates_pending(&self) -> bool {
4464-
self.context.pending_monitor_updates.is_empty()
4465-
}
4466-
4467-
pub fn complete_all_mon_updates_through(&mut self, update_id: u64) {
4468-
self.context.pending_monitor_updates.retain(|upd| {
4469-
if upd.update.update_id <= update_id {
4470-
assert!(!upd.blocked, "Completed update must have flown");
4471-
false
4472-
} else { true }
4473-
});
4474-
}
4475-
4476-
pub fn complete_one_mon_update(&mut self, update_id: u64) {
4477-
self.context.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
4437+
let release_monitor = self.context.pending_monitor_updates.is_empty();
4438+
if !release_monitor {
4439+
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
4440+
update, blocked: true,
4441+
});
4442+
None
4443+
} else {
4444+
Some(update)
4445+
}
44784446
}
44794447

4480-
/// Returns an iterator over all unblocked monitor updates which have not yet completed.
4481-
pub fn uncompleted_unblocked_mon_updates(&self) -> impl Iterator<Item=&ChannelMonitorUpdate> {
4482-
self.context.pending_monitor_updates.iter()
4483-
.filter_map(|upd| if upd.blocked { None } else { Some(&upd.update) })
4448+
pub fn blocked_monitor_updates_pending(&self) -> usize {
4449+
self.context.pending_monitor_updates.len()
44844450
}
44854451

44864452
/// Returns true if the channel is awaiting the persistence of the initial ChannelMonitor.

0 commit comments

Comments
 (0)