Skip to content

Commit 224fe6b

Browse files
committed
Move in-flight ChannelMonitorUpdates to ChannelManager
Because `ChannelMonitorUpdate`s can be generated for a channel which is already closed, and must still be tracked through their completion, storing them in a `Channel` doesn't make sense - we'd have to have a redundant place to put them post-closure and handle both storage locations equivalently. Instead, here, we move to storing in-flight `ChannelMonitorUpdate`s to the `ChannelManager`, leaving blocked `ChannelMonitorUpdate`s in the `Channel` as they were.
1 parent 1216372 commit 224fe6b

File tree

3 files changed

+105
-90
lines changed

3 files changed

+105
-90
lines changed

lightning/src/ln/channel.rs

+27-60
Original file line numberDiff line numberDiff line change
@@ -2261,50 +2261,38 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
22612261
}
22622262

22632263
pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
2264-
let release_cs_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
2264+
let release_cs_monitor = self.context.pending_monitor_updates.is_empty();
22652265
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
22662266
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
22672267
// Even if we aren't supposed to let new monitor updates with commitment state
22682268
// updates run, we still need to push the preimage ChannelMonitorUpdateStep no
22692269
// matter what. Sadly, to push a new monitor update which flies before others
22702270
// already queued, we have to insert it into the pending queue and update the
22712271
// update_ids of all the following monitors.
2272-
let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
2272+
if release_cs_monitor && msg.is_some() {
22732273
let mut additional_update = self.build_commitment_no_status_check(logger);
22742274
// build_commitment_no_status_check may bump latest_monitor_id but we want them
22752275
// to be strictly increasing by one, so decrement it here.
22762276
self.context.latest_monitor_update_id = monitor_update.update_id;
22772277
monitor_update.updates.append(&mut additional_update.updates);
2278-
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
2279-
update: monitor_update, blocked: false,
2280-
});
2281-
self.context.pending_monitor_updates.len() - 1
22822278
} else {
2283-
let insert_pos = self.context.pending_monitor_updates.iter().position(|upd| upd.blocked)
2284-
.unwrap_or(self.context.pending_monitor_updates.len());
2285-
let new_mon_id = self.context.pending_monitor_updates.get(insert_pos)
2279+
let new_mon_id = self.context.pending_monitor_updates.get(0)
22862280
.map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
22872281
monitor_update.update_id = new_mon_id;
2288-
self.context.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
2289-
update: monitor_update, blocked: false,
2290-
});
2291-
for held_update in self.context.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
2282+
for held_update in self.context.pending_monitor_updates.iter_mut() {
22922283
held_update.update.update_id += 1;
22932284
}
22942285
if msg.is_some() {
2295-
debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set");
22962286
let update = self.build_commitment_no_status_check(logger);
22972287
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
22982288
update, blocked: true,
22992289
});
23002290
}
2301-
insert_pos
2302-
};
2291+
}
2292+
23032293
self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
23042294
UpdateFulfillCommitFetch::NewClaim {
2305-
monitor_update: self.context.pending_monitor_updates.get(unblocked_update_pos)
2306-
.expect("We just pushed the monitor update").update.clone(),
2307-
htlc_value_msat,
2295+
monitor_update, htlc_value_msat,
23082296
}
23092297
},
23102298
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
@@ -3341,8 +3329,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
33413329
}
33423330

33433331
match self.free_holding_cell_htlcs(logger) {
3344-
(Some(_), htlcs_to_fail) => {
3345-
let mut additional_update = self.context.pending_monitor_updates.pop().unwrap().update;
3332+
(Some(mut additional_update), htlcs_to_fail) => {
33463333
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
33473334
// strictly increasing by one, so decrement it here.
33483335
self.context.latest_monitor_update_id = monitor_update.update_id;
@@ -3558,12 +3545,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
35583545
{
35593546
assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
35603547
self.context.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
3561-
let mut found_blocked = false;
3562-
self.context.pending_monitor_updates.retain(|upd| {
3563-
if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
3564-
if upd.blocked { found_blocked = true; }
3565-
upd.blocked
3566-
});
3548+
for upd in self.context.pending_monitor_updates.iter() {
3549+
debug_assert!(upd.blocked);
3550+
}
35673551

35683552
// If we're past (or at) the FundingSent stage on an outbound channel, try to
35693553
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -4430,48 +4414,31 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
44304414
/// Returns the next blocked monitor update, if one exists, and a bool which indicates a
44314415
/// further blocked monitor update exists after the next.
44324416
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> {
4433-
for i in 0..self.context.pending_monitor_updates.len() {
4434-
if self.context.pending_monitor_updates[i].blocked {
4435-
self.context.pending_monitor_updates[i].blocked = false;
4436-
return Some((self.context.pending_monitor_updates[i].update.clone(),
4437-
self.context.pending_monitor_updates.len() > i + 1));
4438-
}
4417+
for upd in self.context.pending_monitor_updates.iter() {
4418+
debug_assert!(upd.blocked);
44394419
}
4440-
None
4420+
if self.context.pending_monitor_updates.is_empty() { return None; }
4421+
Some((self.context.pending_monitor_updates.remove(0).update,
4422+
!self.context.pending_monitor_updates.is_empty()))
44414423
}
44424424

44434425
/// Pushes a new monitor update into our monitor update queue, returning it if it should be
44444426
/// immediately given to the user for persisting or `None` if it should be held as blocked.
44454427
fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
44464428
-> Option<ChannelMonitorUpdate> {
4447-
let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
4448-
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
4449-
update, blocked: !release_monitor,
4450-
});
4451-
if release_monitor { self.context.pending_monitor_updates.last().map(|upd| upd.update.clone()) } else { None }
4452-
}
4453-
4454-
pub fn no_monitor_updates_pending(&self) -> bool {
4455-
self.context.pending_monitor_updates.is_empty()
4456-
}
4457-
4458-
pub fn complete_all_mon_updates_through(&mut self, update_id: u64) {
4459-
self.context.pending_monitor_updates.retain(|upd| {
4460-
if upd.update.update_id <= update_id {
4461-
assert!(!upd.blocked, "Completed update must have flown");
4462-
false
4463-
} else { true }
4464-
});
4465-
}
4466-
4467-
pub fn complete_one_mon_update(&mut self, update_id: u64) {
4468-
self.context.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
4429+
let release_monitor = self.context.pending_monitor_updates.is_empty();
4430+
if !release_monitor {
4431+
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
4432+
update, blocked: true,
4433+
});
4434+
None
4435+
} else {
4436+
Some(update)
4437+
}
44694438
}
44704439

4471-
/// Returns an iterator over all unblocked monitor updates which have not yet completed.
4472-
pub fn uncompleted_unblocked_mon_updates(&self) -> impl Iterator<Item=&ChannelMonitorUpdate> {
4473-
self.context.pending_monitor_updates.iter()
4474-
.filter_map(|upd| if upd.blocked { None } else { Some(&upd.update) })
4440+
pub fn blocked_monitor_updates_pending(&self) -> usize {
4441+
self.context.pending_monitor_updates.len()
44754442
}
44764443

44774444
/// Returns true if the channel is awaiting the persistence of the initial ChannelMonitor.

lightning/src/ln/channelmanager.rs

+77-30
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,13 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
628628
/// Messages to send to the peer - pushed to in the same lock that they are generated in (except
629629
/// for broadcast messages, where ordering isn't as strict).
630630
pub(super) pending_msg_events: Vec<MessageSendEvent>,
631+
/// Map from Channel IDs to pending [`ChannelMonitorUpdate`]s which have been passed to the
632+
/// user but which have not yet completed.
633+
///
634+
/// Note that the channel may no longer exist. For example if the channel was closed but we
635+
/// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update
636+
/// for a missing channel.
637+
in_flight_monitor_updates: BTreeMap<OutPoint, Vec<ChannelMonitorUpdate>>,
631638
/// Map from a specific channel to some action(s) that should be taken when all pending
632639
/// [`ChannelMonitorUpdate`]s for the channel complete updating.
633640
///
@@ -663,6 +670,7 @@ impl <Signer: ChannelSigner> PeerState<Signer> {
663670
return false
664671
}
665672
self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty()
673+
&& self.in_flight_monitor_updates.is_empty()
666674
}
667675

668676
// Returns a count of all channels we have with this peer, including pending channels.
@@ -1855,7 +1863,7 @@ macro_rules! handle_monitor_update_completion {
18551863
}
18561864

18571865
macro_rules! handle_new_monitor_update {
1858-
($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { {
1866+
($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { {
18591867
// update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
18601868
// any case so that it won't deadlock.
18611869
debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
@@ -1880,20 +1888,33 @@ macro_rules! handle_new_monitor_update {
18801888
res
18811889
},
18821890
ChannelMonitorUpdateStatus::Completed => {
1883-
$chan.complete_one_mon_update($update_id);
1884-
if $chan.no_monitor_updates_pending() {
1885-
handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan);
1886-
}
1891+
$completed;
18871892
Ok(true)
18881893
},
18891894
}
18901895
} };
1896+
($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => {
1897+
handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state,
1898+
$per_peer_state_lock, $chan, _internal, $remove,
1899+
handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan))
1900+
};
18911901
($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => {
18921902
handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_INITIAL_MONITOR, $chan_entry.remove_entry())
18931903
};
18941904
($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
1895-
let update_res = $self.chain_monitor.update_channel($funding_txo, &$update);
1896-
handle_new_monitor_update!($self, update_res, $update.update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan, MANUALLY_REMOVING_INITIAL_MONITOR, $remove)
1905+
let update_id = $update.update_id;
1906+
let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo)
1907+
.or_insert_with(Vec::new);
1908+
in_flight_updates.push($update);
1909+
let update_res = $self.chain_monitor.update_channel($funding_txo, in_flight_updates.last().unwrap());
1910+
handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state,
1911+
$per_peer_state_lock, $chan, _internal, $remove,
1912+
{
1913+
in_flight_updates.pop();
1914+
if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 {
1915+
handle_monitor_update_completion!($self, update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan);
1916+
}
1917+
})
18971918
} };
18981919
($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => {
18991920
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
@@ -4032,15 +4053,14 @@ where
40324053
match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) {
40334054
hash_map::Entry::Occupied(mut chan) => {
40344055
updated_chan = true;
4035-
handle_new_monitor_update!(self, funding_txo, update,
4056+
handle_new_monitor_update!(self, funding_txo, update.clone(),
40364057
peer_state_lock, peer_state, per_peer_state, chan).map(|_| ())
40374058
},
40384059
hash_map::Entry::Vacant(_) => Ok(()),
40394060
}
40404061
} else { Ok(()) }
40414062
};
40424063
if !updated_chan {
4043-
// TODO: Track this as in-flight even though the channel is closed.
40444064
let _ = self.chain_monitor.update_channel(funding_txo, &update);
40454065
}
40464066
// TODO: If this channel has since closed, we're likely providing a payment
@@ -4831,8 +4851,14 @@ where
48314851
hash_map::Entry::Vacant(_) => return,
48324852
}
48334853
};
4834-
log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}",
4835-
highest_applied_update_id, channel.get().context.get_latest_monitor_update_id());
4854+
let remaining_in_flight =
4855+
if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) {
4856+
pending.retain(|upd| upd.update_id > highest_applied_update_id);
4857+
pending.len()
4858+
} else { 0 };
4859+
log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}. {} pending in-flight updates.",
4860+
highest_applied_update_id, channel.get().context.get_latest_monitor_update_id(),
4861+
remaining_in_flight);
48364862
if !channel.get().is_awaiting_monitor_update() || channel.get().context.get_latest_monitor_update_id() != highest_applied_update_id {
48374863
return;
48384864
}
@@ -6959,6 +6985,7 @@ where
69596985
inbound_v1_channel_by_id: HashMap::new(),
69606986
latest_features: init_msg.features.clone(),
69616987
pending_msg_events: Vec::new(),
6988+
in_flight_monitor_updates: BTreeMap::new(),
69626989
monitor_update_blocked_actions: BTreeMap::new(),
69636990
actions_blocking_raa_monitor_updates: BTreeMap::new(),
69646991
is_connected: true,
@@ -7792,6 +7819,16 @@ where
77927819
pending_claiming_payments = None;
77937820
}
77947821

7822+
let mut in_flight_monitor_updates: Option<HashMap<(&PublicKey, &OutPoint), &Vec<ChannelMonitorUpdate>>> = None;
7823+
for ((counterparty_id, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
7824+
for (funding_outpoint, updates) in peer_state.in_flight_monitor_updates.iter() {
7825+
if !updates.is_empty() {
7826+
if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(HashMap::new()); }
7827+
in_flight_monitor_updates.as_mut().unwrap().insert((counterparty_id, funding_outpoint), updates);
7828+
}
7829+
}
7830+
}
7831+
77957832
write_tlv_fields!(writer, {
77967833
(1, pending_outbound_payments_no_retry, required),
77977834
(2, pending_intercepted_htlcs, option),
@@ -7802,6 +7839,7 @@ where
78027839
(7, self.fake_scid_rand_bytes, required),
78037840
(8, if events_not_backwards_compatible { Some(&*events) } else { None }, option),
78047841
(9, htlc_purposes, vec_type),
7842+
(10, in_flight_monitor_updates, option),
78057843
(11, self.probing_cookie_secret, required),
78067844
(13, htlc_onion_fields, optional_vec),
78077845
});
@@ -8080,7 +8118,6 @@ where
80808118
log_info!(args.logger, "Successfully loaded channel {} at update_id {} against monitor at update id {}",
80818119
log_bytes!(channel.context.channel_id()), channel.context.get_latest_monitor_update_id(),
80828120
monitor.get_latest_update_id());
8083-
channel.complete_all_mon_updates_through(monitor.get_latest_update_id());
80848121
if let Some(short_channel_id) = channel.context.get_short_channel_id() {
80858122
short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
80868123
}
@@ -8166,6 +8203,7 @@ where
81668203
inbound_v1_channel_by_id: HashMap::new(),
81678204
latest_features: Readable::read(reader)?,
81688205
pending_msg_events: Vec::new(),
8206+
in_flight_monitor_updates: BTreeMap::new(),
81698207
monitor_update_blocked_actions: BTreeMap::new(),
81708208
actions_blocking_raa_monitor_updates: BTreeMap::new(),
81718209
is_connected: false,
@@ -8197,24 +8235,6 @@ where
81978235
}
81988236
}
81998237

8200-
for (node_id, peer_mtx) in per_peer_state.iter() {
8201-
let peer_state = peer_mtx.lock().unwrap();
8202-
for (_, chan) in peer_state.channel_by_id.iter() {
8203-
for update in chan.uncompleted_unblocked_mon_updates() {
8204-
if let Some(funding_txo) = chan.context.get_funding_txo() {
8205-
log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for channel {}",
8206-
update.update_id, log_bytes!(funding_txo.to_channel_id()));
8207-
pending_background_events.push(
8208-
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
8209-
counterparty_node_id: *node_id, funding_txo, update: update.clone(),
8210-
});
8211-
} else {
8212-
return Err(DecodeError::InvalidValue);
8213-
}
8214-
}
8215-
}
8216-
}
8217-
82188238
let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111
82198239
let highest_seen_timestamp: u32 = Readable::read(reader)?;
82208240

@@ -8251,6 +8271,7 @@ where
82518271
let mut pending_claiming_payments = Some(HashMap::new());
82528272
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
82538273
let mut events_override = None;
8274+
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
82548275
read_tlv_fields!(reader, {
82558276
(1, pending_outbound_payments_no_retry, option),
82568277
(2, pending_intercepted_htlcs, option),
@@ -8261,6 +8282,7 @@ where
82618282
(7, fake_scid_rand_bytes, option),
82628283
(8, events_override, option),
82638284
(9, claimable_htlc_purposes, vec_type),
8285+
(10, in_flight_monitor_updates, option),
82648286
(11, probing_cookie_secret, option),
82658287
(13, claimable_htlc_onion_fields, optional_vec),
82668288
});
@@ -8294,6 +8316,31 @@ where
82948316
retry_lock: Mutex::new(())
82958317
};
82968318

8319+
if let Some(in_flight_upds) = &mut in_flight_monitor_updates {
8320+
for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() {
8321+
let mut peer_state_lock = peer_state_mtx.lock().unwrap();
8322+
let peer_state = &mut *peer_state_lock;
8323+
for (_, chan) in peer_state.channel_by_id.iter() {
8324+
if let Some(funding_txo) = chan.context.get_funding_txo() {
8325+
if let Some(mut peer_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) {
8326+
let monitor = args.channel_monitors.get(&funding_txo).unwrap();
8327+
peer_in_flight_upds.retain(|upd| upd.update_id > monitor.get_latest_update_id());
8328+
for update in peer_in_flight_upds.iter() {
8329+
log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for channel {}",
8330+
update.update_id, log_bytes!(funding_txo.to_channel_id()));
8331+
pending_background_events.push(
8332+
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
8333+
counterparty_node_id: *counterparty_id,
8334+
funding_txo, update: update.clone(),
8335+
});
8336+
}
8337+
peer_state.in_flight_monitor_updates.insert(funding_txo, peer_in_flight_upds);
8338+
}
8339+
} else { debug_assert!(false, "We already loaded a channel, it has to have been funded"); }
8340+
}
8341+
}
8342+
}
8343+
82978344
{
82988345
// If we're tracking pending payments, ensure we haven't lost any by looking at the
82998346
// ChannelMonitor data for any channels for which we do not have authorative state

lightning/src/util/ser.rs

+1
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,7 @@ impl Readable for Vec<u8> {
847847
}
848848

849849
impl_for_vec!(ecdsa::Signature);
850+
impl_for_vec!(crate::chain::channelmonitor::ChannelMonitorUpdate);
850851
impl_for_vec!(crate::ln::channelmanager::MonitorUpdateCompletionAction);
851852
impl_for_vec!((A, B), A, B);
852853
impl_writeable_for_vec!(&crate::routing::router::BlindedTail);

0 commit comments

Comments
 (0)