Skip to content

Commit e94647c

Browse files
authored
Merge pull request #2111 from TheBlueMatt/2023-03-sent-persist-order-prep
Setup Support for delaying `ChannelMonitorUpdate` flight until an `Event` completes
2 parents bb38ed3 + 9dfe42c commit e94647c

File tree

5 files changed

+426
-180
lines changed

5 files changed

+426
-180
lines changed

lightning/src/ln/chanmon_update_fail_tests.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ fn test_monitor_and_persister_update_fail() {
146146
let mut node_0_per_peer_lock;
147147
let mut node_0_peer_state_lock;
148148
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2);
149-
if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
149+
if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
150150
// Check that even though the persister is returning a InProgress,
151151
// because the update is bogus, ultimately the error that's returned
152152
// should be a PermanentFailure.

lightning/src/ln/channel.rs

+128-45
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,21 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4;
479479
/// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval
480480
pub(crate) const EXPIRE_PREV_CONFIG_TICKS: usize = 5;
481481

482+
struct PendingChannelMonitorUpdate {
483+
update: ChannelMonitorUpdate,
484+
/// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
485+
/// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is
486+
/// blocked on some external event and the [`ChannelManager`] will update us when we're ready.
487+
///
488+
/// [`ChannelManager`]: super::channelmanager::ChannelManager
489+
blocked: bool,
490+
}
491+
492+
impl_writeable_tlv_based!(PendingChannelMonitorUpdate, {
493+
(0, update, required),
494+
(2, blocked, required),
495+
});
496+
482497
// TODO: We should refactor this to be an Inbound/OutboundChannel until initial setup handshaking
483498
// has been completed, and then turn into a Channel to get compiler-time enforcement of things like
484499
// calling channel_id() before we're set up or things like get_outbound_funding_signed on an
@@ -744,7 +759,7 @@ pub(super) struct Channel<Signer: ChannelSigner> {
744759
/// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
745760
/// completes we still need to be able to complete the persistence. Thus, we have to keep a
746761
/// copy of the [`ChannelMonitorUpdate`] here until it is complete.
747-
pending_monitor_updates: Vec<ChannelMonitorUpdate>,
762+
pending_monitor_updates: Vec<PendingChannelMonitorUpdate>,
748763
}
749764

750765
#[cfg(any(test, fuzzing))]
@@ -1995,28 +2010,52 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
19952010
}
19962011

19972012
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
2013+
let release_cs_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked);
19982014
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
1999-
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => {
2000-
let mut additional_update = self.build_commitment_no_status_check(logger);
2001-
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
2002-
// strictly increasing by one, so decrement it here.
2003-
self.latest_monitor_update_id = monitor_update.update_id;
2004-
monitor_update.updates.append(&mut additional_update.updates);
2005-
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
2006-
self.pending_monitor_updates.push(monitor_update);
2015+
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
2016+
// Even if we aren't supposed to let new monitor updates with commitment state
2017+
// updates run, we still need to push the preimage ChannelMonitorUpdateStep no
2018+
// matter what. Sadly, to push a new monitor update which flies before others
2019+
// already queued, we have to insert it into the pending queue and update the
2020+
// update_ids of all the following monitors.
2021+
let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
2022+
let mut additional_update = self.build_commitment_no_status_check(logger);
2023+
// build_commitment_no_status_check may bump latest_monitor_id but we want them
2024+
// to be strictly increasing by one, so decrement it here.
2025+
self.latest_monitor_update_id = monitor_update.update_id;
2026+
monitor_update.updates.append(&mut additional_update.updates);
2027+
self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
2028+
update: monitor_update, blocked: false,
2029+
});
2030+
self.pending_monitor_updates.len() - 1
2031+
} else {
2032+
let insert_pos = self.pending_monitor_updates.iter().position(|upd| upd.blocked)
2033+
.unwrap_or(self.pending_monitor_updates.len());
2034+
let new_mon_id = self.pending_monitor_updates.get(insert_pos)
2035+
.map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
2036+
monitor_update.update_id = new_mon_id;
2037+
self.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
2038+
update: monitor_update, blocked: false,
2039+
});
2040+
for held_update in self.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
2041+
held_update.update.update_id += 1;
2042+
}
2043+
if msg.is_some() {
2044+
debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set");
2045+
let update = self.build_commitment_no_status_check(logger);
2046+
self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
2047+
update, blocked: true,
2048+
});
2049+
}
2050+
insert_pos
2051+
};
2052+
self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
20072053
UpdateFulfillCommitFetch::NewClaim {
2008-
monitor_update: self.pending_monitor_updates.last().unwrap(),
2054+
monitor_update: &self.pending_monitor_updates.get(unblocked_update_pos)
2055+
.expect("We just pushed the monitor update").update,
20092056
htlc_value_msat,
20102057
}
20112058
},
2012-
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => {
2013-
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
2014-
self.pending_monitor_updates.push(monitor_update);
2015-
UpdateFulfillCommitFetch::NewClaim {
2016-
monitor_update: self.pending_monitor_updates.last().unwrap(),
2017-
htlc_value_msat,
2018-
}
2019-
}
20202059
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
20212060
}
20222061
}
@@ -3084,7 +3123,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
30843123
Ok(())
30853124
}
30863125

3087-
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError>
3126+
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError>
30883127
where L::Target: Logger
30893128
{
30903129
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3284,8 +3323,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
32843323
}
32853324
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.",
32863325
log_bytes!(self.channel_id));
3287-
self.pending_monitor_updates.push(monitor_update);
3288-
return Ok(self.pending_monitor_updates.last().unwrap());
3326+
return Ok(self.push_ret_blockable_mon_update(monitor_update));
32893327
}
32903328

32913329
let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
@@ -3302,9 +3340,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
33023340

33033341
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.",
33043342
log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" });
3305-
self.pending_monitor_updates.push(monitor_update);
33063343
self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new());
3307-
return Ok(self.pending_monitor_updates.last().unwrap());
3344+
return Ok(self.push_ret_blockable_mon_update(monitor_update));
33083345
}
33093346

33103347
/// Public version of the below, checking relevant preconditions first.
@@ -3419,8 +3456,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
34193456
update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());
34203457

34213458
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
3422-
self.pending_monitor_updates.push(monitor_update);
3423-
(Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
3459+
(self.push_ret_blockable_mon_update(monitor_update), htlcs_to_fail)
34243460
} else {
34253461
(None, Vec::new())
34263462
}
@@ -3431,7 +3467,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
34313467
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
34323468
/// generating an appropriate error *after* the channel state has been updated based on the
34333469
/// revoke_and_ack message.
3434-
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError>
3470+
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<&ChannelMonitorUpdate>), ChannelError>
34353471
where L::Target: Logger,
34363472
{
34373473
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3628,21 +3664,19 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
36283664
self.monitor_pending_failures.append(&mut revoked_htlcs);
36293665
self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
36303666
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
3631-
self.pending_monitor_updates.push(monitor_update);
3632-
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
3667+
return Ok((Vec::new(), self.push_ret_blockable_mon_update(monitor_update)));
36333668
}
36343669

36353670
match self.free_holding_cell_htlcs(logger) {
36363671
(Some(_), htlcs_to_fail) => {
3637-
let mut additional_update = self.pending_monitor_updates.pop().unwrap();
3672+
let mut additional_update = self.pending_monitor_updates.pop().unwrap().update;
36383673
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
36393674
// strictly increasing by one, so decrement it here.
36403675
self.latest_monitor_update_id = monitor_update.update_id;
36413676
monitor_update.updates.append(&mut additional_update.updates);
36423677

36433678
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3644-
self.pending_monitor_updates.push(monitor_update);
3645-
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
3679+
Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
36463680
},
36473681
(None, htlcs_to_fail) => {
36483682
if require_commitment {
@@ -3656,13 +3690,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
36563690
log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
36573691
log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
36583692
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3659-
self.pending_monitor_updates.push(monitor_update);
3660-
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
3693+
Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
36613694
} else {
36623695
log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
36633696
self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3664-
self.pending_monitor_updates.push(monitor_update);
3665-
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
3697+
Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
36663698
}
36673699
}
36683700
}
@@ -3851,7 +3883,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
38513883
{
38523884
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
38533885
self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
3854-
self.pending_monitor_updates.clear();
3886+
let mut found_blocked = false;
3887+
self.pending_monitor_updates.retain(|upd| {
3888+
if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
3889+
if upd.blocked { found_blocked = true; }
3890+
upd.blocked
3891+
});
38553892

38563893
// If we're past (or at) the FundingSent stage on an outbound channel, try to
38573894
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -4392,8 +4429,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
43924429
}],
43934430
};
43944431
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
4395-
self.pending_monitor_updates.push(monitor_update);
4396-
Some(self.pending_monitor_updates.last().unwrap())
4432+
if self.push_blockable_mon_update(monitor_update) {
4433+
self.pending_monitor_updates.last().map(|upd| &upd.update)
4434+
} else { None }
43974435
} else { None };
43984436
let shutdown = if send_shutdown {
43994437
Some(msgs::Shutdown {
@@ -4965,8 +5003,49 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
49655003
(self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0
49665004
}
49675005

4968-
pub fn get_next_monitor_update(&self) -> Option<&ChannelMonitorUpdate> {
4969-
self.pending_monitor_updates.first()
5006+
pub fn get_latest_complete_monitor_update_id(&self) -> u64 {
5007+
if self.pending_monitor_updates.is_empty() { return self.get_latest_monitor_update_id(); }
5008+
self.pending_monitor_updates[0].update.update_id - 1
5009+
}
5010+
5011+
/// Returns the next blocked monitor update, if one exists, and a bool which indicates a
5012+
/// further blocked monitor update exists after the next.
5013+
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(&ChannelMonitorUpdate, bool)> {
5014+
for i in 0..self.pending_monitor_updates.len() {
5015+
if self.pending_monitor_updates[i].blocked {
5016+
self.pending_monitor_updates[i].blocked = false;
5017+
return Some((&self.pending_monitor_updates[i].update,
5018+
self.pending_monitor_updates.len() > i + 1));
5019+
}
5020+
}
5021+
None
5022+
}
5023+
5024+
/// Pushes a new monitor update into our monitor update queue, returning whether it should be
5025+
/// immediately given to the user for persisting or if it should be held as blocked.
5026+
fn push_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> bool {
5027+
let release_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked);
5028+
self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
5029+
update, blocked: !release_monitor
5030+
});
5031+
release_monitor
5032+
}
5033+
5034+
/// Pushes a new monitor update into our monitor update queue, returning a reference to it if
5035+
/// it should be immediately given to the user for persisting or `None` if it should be held as
5036+
/// blocked.
5037+
fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
5038+
-> Option<&ChannelMonitorUpdate> {
5039+
let release_monitor = self.push_blockable_mon_update(update);
5040+
if release_monitor { self.pending_monitor_updates.last().map(|upd| &upd.update) } else { None }
5041+
}
5042+
5043+
pub fn no_monitor_updates_pending(&self) -> bool {
5044+
self.pending_monitor_updates.is_empty()
5045+
}
5046+
5047+
pub fn complete_one_mon_update(&mut self, update_id: u64) {
5048+
self.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
49705049
}
49715050

49725051
/// Returns true if funding_created was sent/received.
@@ -6009,8 +6088,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
60096088
Some(_) => {
60106089
let monitor_update = self.build_commitment_no_status_check(logger);
60116090
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
6012-
self.pending_monitor_updates.push(monitor_update);
6013-
Ok(Some(self.pending_monitor_updates.last().unwrap()))
6091+
Ok(self.push_ret_blockable_mon_update(monitor_update))
60146092
},
60156093
None => Ok(None)
60166094
}
@@ -6112,8 +6190,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
61126190
}],
61136191
};
61146192
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
6115-
self.pending_monitor_updates.push(monitor_update);
6116-
Some(self.pending_monitor_updates.last().unwrap())
6193+
if self.push_blockable_mon_update(monitor_update) {
6194+
self.pending_monitor_updates.last().map(|upd| &upd.update)
6195+
} else { None }
61176196
} else { None };
61186197
let shutdown = msgs::Shutdown {
61196198
channel_id: self.channel_id,
@@ -6550,6 +6629,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for Channel<Signer> {
65506629
(28, holder_max_accepted_htlcs, option),
65516630
(29, self.temporary_channel_id, option),
65526631
(31, channel_pending_event_emitted, option),
6632+
(33, self.pending_monitor_updates, vec_type),
65536633
});
65546634

65556635
Ok(())
@@ -6826,6 +6906,8 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
68266906
let mut temporary_channel_id: Option<[u8; 32]> = None;
68276907
let mut holder_max_accepted_htlcs: Option<u16> = None;
68286908

6909+
let mut pending_monitor_updates = Some(Vec::new());
6910+
68296911
read_tlv_fields!(reader, {
68306912
(0, announcement_sigs, option),
68316913
(1, minimum_depth, option),
@@ -6848,6 +6930,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
68486930
(28, holder_max_accepted_htlcs, option),
68496931
(29, temporary_channel_id, option),
68506932
(31, channel_pending_event_emitted, option),
6933+
(33, pending_monitor_updates, vec_type),
68516934
});
68526935

68536936
let (channel_keys_id, holder_signer) = if let Some(channel_keys_id) = channel_keys_id {
@@ -7017,7 +7100,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
70177100
channel_type: channel_type.unwrap(),
70187101
channel_keys_id,
70197102

7020-
pending_monitor_updates: Vec::new(),
7103+
pending_monitor_updates: pending_monitor_updates.unwrap(),
70217104
})
70227105
}
70237106
}

0 commit comments

Comments
 (0)