Skip to content

Commit 8fca379

Browse files
committed
DRY the pre-startup ChannelMonitorUpdate handling
This moves the common `if during_startup { push background event } else { apply ChannelMonitorUpdate }` pattern by simply inlining it in `handle_new_monitor_update`. It also ensures we always insert `ChannelMonitorUpdate`s in the pending updates set when we push the background event, avoiding a race where we push an update as a background event, then while its processing another update finishes and the post-update actions get run.
1 parent ce9e5a7 commit 8fca379

File tree

1 file changed

+45
-70
lines changed

1 file changed

+45
-70
lines changed

lightning/src/ln/channelmanager.rs

+45-70
Original file line numberDiff line numberDiff line change
@@ -2929,26 +2929,9 @@ macro_rules! handle_error {
29292929
/// [`ChannelMonitor`]/channel funding transaction) to begin with.
29302930
macro_rules! locked_close_channel {
29312931
($self: ident, $peer_state: expr, $channel_context: expr, $shutdown_res_mut: expr) => {{
2932-
if let Some((counterparty_node_id, funding_txo, channel_id, update)) = $shutdown_res_mut.monitor_update.take() {
2933-
if $self.background_events_processed_since_startup.load(Ordering::Acquire) {
2934-
handle_new_monitor_update!($self, funding_txo, update, $peer_state,
2935-
$channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER);
2936-
} else {
2937-
let in_flight_updates = $peer_state.in_flight_monitor_updates.entry(funding_txo)
2938-
.or_insert_with(Vec::new);
2939-
in_flight_updates.iter().position(|upd| upd == &update)
2940-
.unwrap_or_else(|| {
2941-
in_flight_updates.push(update.clone());
2942-
0
2943-
});
2944-
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
2945-
counterparty_node_id,
2946-
funding_txo,
2947-
channel_id,
2948-
update,
2949-
};
2950-
$self.pending_background_events.lock().unwrap().push(event);
2951-
}
2932+
if let Some((_, funding_txo, _, update)) = $shutdown_res_mut.monitor_update.take() {
2933+
handle_new_monitor_update!($self, funding_txo, update, $peer_state,
2934+
$channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER);
29522935
}
29532936
// If there's a possibility that we need to generate further monitor updates for this
29542937
// channel, we need to store the last update_id of it. However, we don't want to insert
@@ -3279,8 +3262,8 @@ macro_rules! handle_new_monitor_update {
32793262
};
32803263
(
32813264
$self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $logger: expr,
3282-
$chan_id: expr, $in_flight_updates: ident, $update_idx: ident, _internal_outer,
3283-
$completed: expr
3265+
$chan_id: expr, $counterparty_node_id: expr, $in_flight_updates: ident, $update_idx: ident,
3266+
_internal_outer, $completed: expr
32843267
) => { {
32853268
$in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo)
32863269
.or_insert_with(Vec::new);
@@ -3292,31 +3275,47 @@ macro_rules! handle_new_monitor_update {
32923275
$in_flight_updates.push($update);
32933276
$in_flight_updates.len() - 1
32943277
});
3295-
let update_res = $self.chain_monitor.update_channel($funding_txo, &$in_flight_updates[$update_idx]);
3296-
handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed)
3278+
if $self.background_events_processed_since_startup.load(Ordering::Acquire) {
3279+
let update_res = $self.chain_monitor.update_channel($funding_txo, &$in_flight_updates[$update_idx]);
3280+
handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed)
3281+
} else {
3282+
// We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we
3283+
// fail to persist it. This is a fairly safe assumption, however, since anything we do
3284+
// during the startup sequence should be replayed exactly if we immediately crash.
3285+
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
3286+
counterparty_node_id: $counterparty_node_id,
3287+
funding_txo: $funding_txo,
3288+
channel_id: $chan_id,
3289+
update: $in_flight_updates[$update_idx].clone(),
3290+
};
3291+
$self.pending_background_events.lock().unwrap().push(event);
3292+
false
3293+
}
32973294
} };
32983295
(
32993296
$self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $chan_context: expr,
33003297
REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER
33013298
) => { {
33023299
let logger = WithChannelContext::from(&$self.logger, &$chan_context, None);
33033300
let chan_id = $chan_context.channel_id();
3301+
let counterparty_node_id = $chan_context.get_counterparty_node_id();
33043302
let in_flight_updates;
33053303
let idx;
33063304
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id,
3307-
in_flight_updates, idx, _internal_outer,
3305+
counterparty_node_id, in_flight_updates, idx, _internal_outer,
33083306
{
33093307
let _ = in_flight_updates.remove(idx);
33103308
})
33113309
} };
33123310
(
33133311
$self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr,
3314-
$per_peer_state_lock: expr, $logger: expr, $channel_id: expr, POST_CHANNEL_CLOSE
3312+
$per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr, POST_CHANNEL_CLOSE
33153313
) => { {
3314+
let logger = WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None);
33163315
let in_flight_updates;
33173316
let idx;
3318-
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, $logger,
3319-
$channel_id, in_flight_updates, idx, _internal_outer,
3317+
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger,
3318+
$channel_id, $counterparty_node_id, in_flight_updates, idx, _internal_outer,
33203319
{
33213320
let _ = in_flight_updates.remove(idx);
33223321
if in_flight_updates.is_empty() {
@@ -3336,10 +3335,11 @@ macro_rules! handle_new_monitor_update {
33363335
) => { {
33373336
let logger = WithChannelContext::from(&$self.logger, &$chan.context, None);
33383337
let chan_id = $chan.context.channel_id();
3338+
let counterparty_node_id = $chan.context.get_counterparty_node_id();
33393339
let in_flight_updates;
33403340
let idx;
33413341
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id,
3342-
in_flight_updates, idx, _internal_outer,
3342+
counterparty_node_id, in_flight_updates, idx, _internal_outer,
33433343
{
33443344
let _ = in_flight_updates.remove(idx);
33453345
if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 {
@@ -3964,11 +3964,10 @@ where
39643964
},
39653965
hash_map::Entry::Vacant(_) => {},
39663966
}
3967-
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(channel_id), None);
39683967

39693968
handle_new_monitor_update!(
39703969
self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state,
3971-
logger, channel_id, POST_CHANNEL_CLOSE
3970+
counterparty_node_id, channel_id, POST_CHANNEL_CLOSE
39723971
);
39733972
}
39743973

@@ -7160,7 +7159,6 @@ where
71607159
let peer_state = &mut **peer_state_lock;
71617160
if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(chan_id) {
71627161
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
7163-
let counterparty_node_id = chan.context.get_counterparty_node_id();
71647162
let logger = WithChannelContext::from(&self.logger, &chan.context, None);
71657163
let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, payment_info, &&logger);
71667164

@@ -7175,21 +7173,8 @@ where
71757173
if let Some(raa_blocker) = raa_blocker_opt {
71767174
peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
71777175
}
7178-
if !during_init {
7179-
handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_opt,
7180-
peer_state, per_peer_state, chan);
7181-
} else {
7182-
// If we're running during init we cannot update a monitor directly -
7183-
// they probably haven't actually been loaded yet. Instead, push the
7184-
// monitor update as a background event.
7185-
self.pending_background_events.lock().unwrap().push(
7186-
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
7187-
counterparty_node_id,
7188-
funding_txo: prev_hop.funding_txo,
7189-
channel_id: prev_hop.channel_id,
7190-
update: monitor_update.clone(),
7191-
});
7192-
}
7176+
handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_opt,
7177+
peer_state, per_peer_state, chan);
71937178
}
71947179
UpdateFulfillCommitFetch::DuplicateClaim {} => {
71957180
let (action_opt, raa_blocker_opt) = completion_action(None, true);
@@ -7304,26 +7289,10 @@ where
73047289
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
73057290
}
73067291

7307-
if !during_init {
7308-
handle_new_monitor_update!(self, prev_hop.funding_txo, preimage_update, peer_state, peer_state, per_peer_state, logger, chan_id, POST_CHANNEL_CLOSE);
7309-
} else {
7310-
// If we're running during init we cannot update a monitor directly - they probably
7311-
// haven't actually been loaded yet. Instead, push the monitor update as a background
7312-
// event.
7313-
7314-
let in_flight_updates = peer_state.in_flight_monitor_updates
7315-
.entry(prev_hop.funding_txo)
7316-
.or_insert_with(Vec::new);
7317-
in_flight_updates.push(preimage_update.clone());
7318-
7319-
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
7320-
counterparty_node_id,
7321-
funding_txo: prev_hop.funding_txo,
7322-
channel_id: prev_hop.channel_id,
7323-
update: preimage_update,
7324-
};
7325-
self.pending_background_events.lock().unwrap().push(event);
7326-
}
7292+
handle_new_monitor_update!(
7293+
self, prev_hop.funding_txo, preimage_update, peer_state, peer_state, per_peer_state,
7294+
counterparty_node_id, chan_id, POST_CHANNEL_CLOSE
7295+
);
73277296
}
73287297

73297298
fn finalize_claims(&self, sources: Vec<HTLCSource>) {
@@ -13342,14 +13311,20 @@ where
1334213311
}
1334313312
}
1334413313
}
13314+
let mut per_peer_state = per_peer_state.get(counterparty_node_id)
13315+
.expect("If we have pending updates for a channel it has to have an entry")
13316+
.lock().unwrap();
1334513317
if updated_id {
13346-
per_peer_state.get(counterparty_node_id)
13347-
.expect("If we have pending updates for a channel it has to have an entry")
13348-
.lock().unwrap()
13318+
per_peer_state
1334913319
.closed_channel_monitor_update_ids.entry(*channel_id)
1335013320
.and_modify(|v| *v = cmp::max(update.update_id, *v))
1335113321
.or_insert(update.update_id);
1335213322
}
13323+
let in_flight_updates = per_peer_state.in_flight_monitor_updates
13324+
.entry(*funding_txo)
13325+
.or_insert_with(Vec::new);
13326+
debug_assert!(!in_flight_updates.iter().any(|upd| upd == update));
13327+
in_flight_updates.push(update.clone());
1335313328
}
1335413329
pending_background_events.push(new_event);
1335513330
}

0 commit comments

Comments
 (0)