Skip to content

Commit fb56bd3

Browse files
committed
DRY the pre-startup ChannelMonitorUpdate handling
This moves the common `if during_startup { push background event } else { apply ChannelMonitorUpdate }` pattern by simply inlining it in `handle_new_monitor_update`. It also ensures we always insert `ChannelMonitorUpdate`s in the pending updates set when we push the background event, avoiding a race where we push an update as a background event, then while its processing another update finishes and the post-update actions get run.
1 parent 12671cd commit fb56bd3

File tree

1 file changed

+45
-68
lines changed

1 file changed

+45
-68
lines changed

lightning/src/ln/channelmanager.rs

+45-68
Original file line numberDiff line numberDiff line change
@@ -2944,24 +2944,9 @@ macro_rules! handle_error {
29442944
/// [`ChannelMonitor`]/channel funding transaction) to begin with.
29452945
macro_rules! locked_close_channel {
29462946
($self: ident, $peer_state: expr, $channel_context: expr, $shutdown_res_mut: expr) => {{
2947-
if let Some((counterparty_node_id, funding_txo, channel_id, update)) = $shutdown_res_mut.monitor_update.take() {
2948-
if $self.background_events_processed_since_startup.load(Ordering::Acquire) {
2949-
handle_new_monitor_update!($self, funding_txo, update, $peer_state,
2950-
$channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER);
2951-
} else {
2952-
let in_flight_updates = $peer_state.in_flight_monitor_updates.entry(funding_txo)
2953-
.or_insert_with(Vec::new);
2954-
if !in_flight_updates.contains(&update) {
2955-
in_flight_updates.push(update.clone());
2956-
}
2957-
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
2958-
counterparty_node_id,
2959-
funding_txo,
2960-
channel_id,
2961-
update,
2962-
};
2963-
$self.pending_background_events.lock().unwrap().push(event);
2964-
}
2947+
if let Some((_, funding_txo, _, update)) = $shutdown_res_mut.monitor_update.take() {
2948+
handle_new_monitor_update!($self, funding_txo, update, $peer_state,
2949+
$channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER);
29652950
}
29662951
// If there's a possibility that we need to generate further monitor updates for this
29672952
// channel, we need to store the last update_id of it. However, we don't want to insert
@@ -3290,8 +3275,8 @@ macro_rules! handle_new_monitor_update {
32903275
};
32913276
(
32923277
$self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $logger: expr,
3293-
$chan_id: expr, $in_flight_updates: ident, $update_idx: ident, _internal_outer,
3294-
$completed: expr
3278+
$chan_id: expr, $counterparty_node_id: expr, $in_flight_updates: ident, $update_idx: ident,
3279+
_internal_outer, $completed: expr
32953280
) => { {
32963281
$in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo)
32973282
.or_insert_with(Vec::new);
@@ -3303,31 +3288,47 @@ macro_rules! handle_new_monitor_update {
33033288
$in_flight_updates.push($update);
33043289
$in_flight_updates.len() - 1
33053290
});
3306-
let update_res = $self.chain_monitor.update_channel($funding_txo, &$in_flight_updates[$update_idx]);
3307-
handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed)
3291+
if $self.background_events_processed_since_startup.load(Ordering::Acquire) {
3292+
let update_res = $self.chain_monitor.update_channel($funding_txo, &$in_flight_updates[$update_idx]);
3293+
handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed)
3294+
} else {
3295+
// We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we
3296+
// fail to persist it. This is a fairly safe assumption, however, since anything we do
3297+
// during the startup sequence should be replayed exactly if we immediately crash.
3298+
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
3299+
counterparty_node_id: $counterparty_node_id,
3300+
funding_txo: $funding_txo,
3301+
channel_id: $chan_id,
3302+
update: $in_flight_updates[$update_idx].clone(),
3303+
};
3304+
$self.pending_background_events.lock().unwrap().push(event);
3305+
false
3306+
}
33083307
} };
33093308
(
33103309
$self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $chan_context: expr,
33113310
REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER
33123311
) => { {
33133312
let logger = WithChannelContext::from(&$self.logger, &$chan_context, None);
33143313
let chan_id = $chan_context.channel_id();
3314+
let counterparty_node_id = $chan_context.get_counterparty_node_id();
33153315
let in_flight_updates;
33163316
let idx;
33173317
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id,
3318-
in_flight_updates, idx, _internal_outer,
3318+
counterparty_node_id, in_flight_updates, idx, _internal_outer,
33193319
{
33203320
let _ = in_flight_updates.remove(idx);
33213321
})
33223322
} };
33233323
(
33243324
$self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr,
3325-
$per_peer_state_lock: expr, $logger: expr, $channel_id: expr, POST_CHANNEL_CLOSE
3325+
$per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr, POST_CHANNEL_CLOSE
33263326
) => { {
3327+
let logger = WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None);
33273328
let in_flight_updates;
33283329
let idx;
3329-
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, $logger,
3330-
$channel_id, in_flight_updates, idx, _internal_outer,
3330+
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger,
3331+
$channel_id, $counterparty_node_id, in_flight_updates, idx, _internal_outer,
33313332
{
33323333
let _ = in_flight_updates.remove(idx);
33333334
if in_flight_updates.is_empty() {
@@ -3347,10 +3348,11 @@ macro_rules! handle_new_monitor_update {
33473348
) => { {
33483349
let logger = WithChannelContext::from(&$self.logger, &$chan.context, None);
33493350
let chan_id = $chan.context.channel_id();
3351+
let counterparty_node_id = $chan.context.get_counterparty_node_id();
33503352
let in_flight_updates;
33513353
let idx;
33523354
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id,
3353-
in_flight_updates, idx, _internal_outer,
3355+
counterparty_node_id, in_flight_updates, idx, _internal_outer,
33543356
{
33553357
let _ = in_flight_updates.remove(idx);
33563358
if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 {
@@ -3980,11 +3982,10 @@ where
39803982
},
39813983
hash_map::Entry::Vacant(_) => {},
39823984
}
3983-
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(channel_id), None);
39843985

39853986
handle_new_monitor_update!(
39863987
self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state,
3987-
logger, channel_id, POST_CHANNEL_CLOSE
3988+
counterparty_node_id, channel_id, POST_CHANNEL_CLOSE
39883989
);
39893990
}
39903991

@@ -7169,7 +7170,6 @@ where
71697170
let peer_state = &mut **peer_state_lock;
71707171
if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(chan_id) {
71717172
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
7172-
let counterparty_node_id = chan.context.get_counterparty_node_id();
71737173
let logger = WithChannelContext::from(&self.logger, &chan.context, None);
71747174
let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, payment_info, &&logger);
71757175

@@ -7184,21 +7184,8 @@ where
71847184
if let Some(raa_blocker) = raa_blocker_opt {
71857185
peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
71867186
}
7187-
if !during_init {
7188-
handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_opt,
7189-
peer_state, per_peer_state, chan);
7190-
} else {
7191-
// If we're running during init we cannot update a monitor directly -
7192-
// they probably haven't actually been loaded yet. Instead, push the
7193-
// monitor update as a background event.
7194-
self.pending_background_events.lock().unwrap().push(
7195-
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
7196-
counterparty_node_id,
7197-
funding_txo: prev_hop.funding_txo,
7198-
channel_id: prev_hop.channel_id,
7199-
update: monitor_update.clone(),
7200-
});
7201-
}
7187+
handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_opt,
7188+
peer_state, per_peer_state, chan);
72027189
}
72037190
UpdateFulfillCommitFetch::DuplicateClaim {} => {
72047191
let (action_opt, raa_blocker_opt) = completion_action(None, true);
@@ -7313,26 +7300,10 @@ where
73137300
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
73147301
}
73157302

7316-
if !during_init {
7317-
handle_new_monitor_update!(self, prev_hop.funding_txo, preimage_update, peer_state, peer_state, per_peer_state, logger, chan_id, POST_CHANNEL_CLOSE);
7318-
} else {
7319-
// If we're running during init we cannot update a monitor directly - they probably
7320-
// haven't actually been loaded yet. Instead, push the monitor update as a background
7321-
// event.
7322-
7323-
let in_flight_updates = peer_state.in_flight_monitor_updates
7324-
.entry(prev_hop.funding_txo)
7325-
.or_insert_with(Vec::new);
7326-
in_flight_updates.push(preimage_update.clone());
7327-
7328-
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
7329-
counterparty_node_id,
7330-
funding_txo: prev_hop.funding_txo,
7331-
channel_id: prev_hop.channel_id,
7332-
update: preimage_update,
7333-
};
7334-
self.pending_background_events.lock().unwrap().push(event);
7335-
}
7303+
handle_new_monitor_update!(
7304+
self, prev_hop.funding_txo, preimage_update, peer_state, peer_state, per_peer_state,
7305+
counterparty_node_id, chan_id, POST_CHANNEL_CLOSE
7306+
);
73367307
}
73377308

73387309
fn finalize_claims(&self, sources: Vec<HTLCSource>) {
@@ -13725,14 +13696,20 @@ where
1372513696
}
1372613697
}
1372713698
}
13699+
let mut per_peer_state = per_peer_state.get(counterparty_node_id)
13700+
.expect("If we have pending updates for a channel it has to have an entry")
13701+
.lock().unwrap();
1372813702
if updated_id {
13729-
per_peer_state.get(counterparty_node_id)
13730-
.expect("If we have pending updates for a channel it has to have an entry")
13731-
.lock().unwrap()
13703+
per_peer_state
1373213704
.closed_channel_monitor_update_ids.entry(*channel_id)
1373313705
.and_modify(|v| *v = cmp::max(update.update_id, *v))
1373413706
.or_insert(update.update_id);
1373513707
}
13708+
let in_flight_updates = per_peer_state.in_flight_monitor_updates
13709+
.entry(*funding_txo)
13710+
.or_insert_with(Vec::new);
13711+
debug_assert!(!in_flight_updates.iter().any(|upd| upd == update));
13712+
in_flight_updates.push(update.clone());
1373613713
}
1373713714
pending_background_events.push(new_event);
1373813715
}

0 commit comments

Comments
 (0)