Skip to content

Commit a296dac

Browse files
committed
Always process RAA/CS ChannelMonitorUpdates asynchronously
We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline for handling inbound `revoke_and_ack` and `commitment_signed` messages.
1 parent 024e384 commit a296dac

File tree

5 files changed

+65
-181
lines changed

5 files changed

+65
-181
lines changed

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ fn test_monitor_and_persister_update_fail() {
140140
assert_eq!(updates.update_fulfill_htlcs.len(), 1);
141141
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
142142
if let Some(ref mut channel) = nodes[0].node.channel_state.lock().unwrap().by_id.get_mut(&chan.2) {
143-
if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
143+
if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
144144
// Check that even though the persister is returning a InProgress,
145145
// because the update is bogus, ultimately the error that's returned
146146
// should be a PermanentFailure.
@@ -1727,7 +1727,6 @@ fn test_monitor_update_on_pending_forwards() {
17271727
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
17281728
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_2.2 }]);
17291729
check_added_monitors!(nodes[1], 1);
1730-
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
17311730

17321731
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
17331732
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
@@ -1741,11 +1740,11 @@ fn test_monitor_update_on_pending_forwards() {
17411740

17421741
let events = nodes[0].node.get_and_clear_pending_events();
17431742
assert_eq!(events.len(), 2);
1744-
if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[0] {
1743+
if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[1] {
17451744
assert_eq!(payment_hash, payment_hash_1);
17461745
assert!(payment_failed_permanently);
17471746
} else { panic!("Unexpected event!"); }
1748-
match events[1] {
1747+
match events[0] {
17491748
Event::PendingHTLCsForwardable { .. } => { },
17501749
_ => panic!("Unexpected event"),
17511750
};

lightning/src/ln/channel.rs

Lines changed: 40 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -410,17 +410,6 @@ pub enum UpdateFulfillCommitFetch {
410410
DuplicateClaim {},
411411
}
412412

413-
/// The return value of `revoke_and_ack` on success, primarily updates to other channels or HTLC
414-
/// state.
415-
pub(super) struct RAAUpdates {
416-
pub commitment_update: Option<msgs::CommitmentUpdate>,
417-
pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
418-
pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
419-
pub finalized_claimed_htlcs: Vec<HTLCSource>,
420-
pub monitor_update: ChannelMonitorUpdate,
421-
pub holding_cell_failed_htlcs: Vec<(HTLCSource, PaymentHash)>,
422-
}
423-
424413
/// The return value of `monitor_updating_restored`
425414
pub(super) struct MonitorRestoreUpdates {
426415
pub raa: Option<msgs::RevokeAndACK>,
@@ -3001,17 +2990,17 @@ impl<Signer: Sign> Channel<Signer> {
30012990
Ok(())
30022991
}
30032992

3004-
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>, ChannelMonitorUpdate), (Option<ChannelMonitorUpdate>, ChannelError)>
2993+
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError>
30052994
where L::Target: Logger
30062995
{
30072996
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
3008-
return Err((None, ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned())));
2997+
return Err(ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned()));
30092998
}
30102999
if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
3011-
return Err((None, ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned())));
3000+
return Err(ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned()));
30123001
}
30133002
if self.channel_state & BOTH_SIDES_SHUTDOWN_MASK == BOTH_SIDES_SHUTDOWN_MASK && self.last_sent_closing_fee.is_some() {
3014-
return Err((None, ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned())));
3003+
return Err(ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned()));
30153004
}
30163005

30173006
let funding_script = self.get_funding_redeemscript();
@@ -3029,7 +3018,7 @@ impl<Signer: Sign> Channel<Signer> {
30293018
log_bytes!(self.counterparty_funding_pubkey().serialize()), encode::serialize_hex(&bitcoin_tx.transaction),
30303019
log_bytes!(sighash[..]), encode::serialize_hex(&funding_script), log_bytes!(self.channel_id()));
30313020
if let Err(_) = self.secp_ctx.verify_ecdsa(&sighash, &msg.signature, &self.counterparty_funding_pubkey()) {
3032-
return Err((None, ChannelError::Close("Invalid commitment tx signature from peer".to_owned())));
3021+
return Err(ChannelError::Close("Invalid commitment tx signature from peer".to_owned()));
30333022
}
30343023
bitcoin_tx.txid
30353024
};
@@ -3044,7 +3033,7 @@ impl<Signer: Sign> Channel<Signer> {
30443033
debug_assert!(!self.is_outbound());
30453034
let counterparty_reserve_we_require_msat = self.holder_selected_channel_reserve_satoshis * 1000;
30463035
if commitment_stats.remote_balance_msat < commitment_stats.total_fee_sat * 1000 + counterparty_reserve_we_require_msat {
3047-
return Err((None, ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned())));
3036+
return Err(ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned()));
30483037
}
30493038
}
30503039
#[cfg(any(test, fuzzing))]
@@ -3066,7 +3055,7 @@ impl<Signer: Sign> Channel<Signer> {
30663055
}
30673056

30683057
if msg.htlc_signatures.len() != commitment_stats.num_nondust_htlcs {
3069-
return Err((None, ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs))));
3058+
return Err(ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs)));
30703059
}
30713060

30723061
// TODO: Sadly, we pass HTLCs twice to ChannelMonitor: once via the HolderCommitmentTransaction and once via the update
@@ -3084,7 +3073,7 @@ impl<Signer: Sign> Channel<Signer> {
30843073
log_bytes!(msg.htlc_signatures[idx].serialize_compact()[..]), log_bytes!(keys.countersignatory_htlc_key.serialize()),
30853074
encode::serialize_hex(&htlc_tx), log_bytes!(htlc_sighash[..]), encode::serialize_hex(&htlc_redeemscript), log_bytes!(self.channel_id()));
30863075
if let Err(_) = self.secp_ctx.verify_ecdsa(&htlc_sighash, &msg.htlc_signatures[idx], &keys.countersignatory_htlc_key) {
3087-
return Err((None, ChannelError::Close("Invalid HTLC tx signature from peer".to_owned())));
3076+
return Err(ChannelError::Close("Invalid HTLC tx signature from peer".to_owned()));
30883077
}
30893078
htlcs_and_sigs.push((htlc, Some(msg.htlc_signatures[idx]), source));
30903079
} else {
@@ -3100,10 +3089,8 @@ impl<Signer: Sign> Channel<Signer> {
31003089
self.counterparty_funding_pubkey()
31013090
);
31023091

3103-
let next_per_commitment_point = self.holder_signer.get_per_commitment_point(self.cur_holder_commitment_transaction_number - 1, &self.secp_ctx);
31043092
self.holder_signer.validate_holder_commitment(&holder_commitment_tx, commitment_stats.preimages)
3105-
.map_err(|_| (None, ChannelError::Close("Failed to validate our commitment".to_owned())))?;
3106-
let per_commitment_secret = self.holder_signer.release_commitment_secret(self.cur_holder_commitment_transaction_number + 1);
3093+
.map_err(|_| ChannelError::Close("Failed to validate our commitment".to_owned()))?;
31073094

31083095
// Update state now that we've passed all the can-fail calls...
31093096
let mut need_commitment = false;
@@ -3148,7 +3135,7 @@ impl<Signer: Sign> Channel<Signer> {
31483135

31493136
self.cur_holder_commitment_transaction_number -= 1;
31503137
// Note that if we need_commitment & !AwaitingRemoteRevoke we'll call
3151-
// send_commitment_no_status_check() next which will reset this to RAAFirst.
3138+
// build_commitment_no_status_check() next which will reset this to RAAFirst.
31523139
self.resend_order = RAACommitmentOrder::CommitmentFirst;
31533140

31543141
if (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 {
@@ -3160,37 +3147,35 @@ impl<Signer: Sign> Channel<Signer> {
31603147
// the corresponding HTLC status updates so that get_last_commitment_update
31613148
// includes the right HTLCs.
31623149
self.monitor_pending_commitment_signed = true;
3163-
let (_, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
3164-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3150+
let mut additional_update = self.build_commitment_no_status_check(logger);
3151+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
31653152
// strictly increasing by one, so decrement it here.
31663153
self.latest_monitor_update_id = monitor_update.update_id;
31673154
monitor_update.updates.append(&mut additional_update.updates);
31683155
}
31693156
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.",
31703157
log_bytes!(self.channel_id));
3171-
return Err((Some(monitor_update), ChannelError::Ignore("Previous monitor update failure prevented generation of RAA".to_owned())));
3158+
self.pending_monitor_updates.push(monitor_update);
3159+
return Ok(self.pending_monitor_updates.last().unwrap());
31723160
}
31733161

3174-
let commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
3162+
let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
31753163
// If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok -
31763164
// we'll send one right away when we get the revoke_and_ack when we
31773165
// free_holding_cell_htlcs().
3178-
let (msg, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
3179-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3166+
let mut additional_update = self.build_commitment_no_status_check(logger);
3167+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
31803168
// strictly increasing by one, so decrement it here.
31813169
self.latest_monitor_update_id = monitor_update.update_id;
31823170
monitor_update.updates.append(&mut additional_update.updates);
3183-
Some(msg)
3184-
} else { None };
3171+
true
3172+
} else { false };
31853173

31863174
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.",
3187-
log_bytes!(self.channel_id()), if commitment_signed.is_some() { " our own commitment_signed and" } else { "" });
3188-
3189-
Ok((msgs::RevokeAndACK {
3190-
channel_id: self.channel_id,
3191-
per_commitment_secret,
3192-
next_per_commitment_point,
3193-
}, commitment_signed, monitor_update))
3175+
log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" });
3176+
self.pending_monitor_updates.push(monitor_update);
3177+
self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new());
3178+
return Ok(self.pending_monitor_updates.last().unwrap());
31943179
}
31953180

31963181
/// Public version of the below, checking relevant preconditions first.
@@ -3322,7 +3307,7 @@ impl<Signer: Sign> Channel<Signer> {
33223307
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
33233308
/// generating an appropriate error *after* the channel state has been updated based on the
33243309
/// revoke_and_ack message.
3325-
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<RAAUpdates, ChannelError>
3310+
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError>
33263311
where L::Target: Logger,
33273312
{
33283313
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3509,8 +3494,8 @@ impl<Signer: Sign> Channel<Signer> {
35093494
// When the monitor updating is restored we'll call get_last_commitment_update(),
35103495
// which does not update state, but we're definitely now awaiting a remote revoke
35113496
// before we can step forward any more, so set it here.
3512-
let (_, mut additional_update) = self.send_commitment_no_status_check(logger)?;
3513-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3497+
let mut additional_update = self.build_commitment_no_status_check(logger);
3498+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
35143499
// strictly increasing by one, so decrement it here.
35153500
self.latest_monitor_update_id = monitor_update.update_id;
35163501
monitor_update.updates.append(&mut additional_update.updates);
@@ -3519,12 +3504,8 @@ impl<Signer: Sign> Channel<Signer> {
35193504
self.monitor_pending_failures.append(&mut revoked_htlcs);
35203505
self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
35213506
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
3522-
return Ok(RAAUpdates {
3523-
commitment_update: None, finalized_claimed_htlcs: Vec::new(),
3524-
accepted_htlcs: Vec::new(), failed_htlcs: Vec::new(),
3525-
monitor_update,
3526-
holding_cell_failed_htlcs: Vec::new()
3527-
});
3507+
self.pending_monitor_updates.push(monitor_update);
3508+
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
35283509
}
35293510

35303511
match self.free_holding_cell_htlcs(logger)? {
@@ -3543,47 +3524,29 @@ impl<Signer: Sign> Channel<Signer> {
35433524
self.latest_monitor_update_id = monitor_update.update_id;
35443525
monitor_update.updates.append(&mut additional_update.updates);
35453526

3546-
Ok(RAAUpdates {
3547-
commitment_update: Some(commitment_update),
3548-
finalized_claimed_htlcs,
3549-
accepted_htlcs: to_forward_infos,
3550-
failed_htlcs: revoked_htlcs,
3551-
monitor_update,
3552-
holding_cell_failed_htlcs: htlcs_to_fail
3553-
})
3527+
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3528+
self.pending_monitor_updates.push(monitor_update);
3529+
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
35543530
},
35553531
(None, htlcs_to_fail) => {
35563532
if require_commitment {
3557-
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
3533+
let mut additional_update = self.build_commitment_no_status_check(logger);
35583534

3559-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3535+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
35603536
// strictly increasing by one, so decrement it here.
35613537
self.latest_monitor_update_id = monitor_update.update_id;
35623538
monitor_update.updates.append(&mut additional_update.updates);
35633539

35643540
log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
35653541
log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
3566-
Ok(RAAUpdates {
3567-
commitment_update: Some(msgs::CommitmentUpdate {
3568-
update_add_htlcs: Vec::new(),
3569-
update_fulfill_htlcs: Vec::new(),
3570-
update_fail_htlcs,
3571-
update_fail_malformed_htlcs,
3572-
update_fee: None,
3573-
commitment_signed
3574-
}),
3575-
finalized_claimed_htlcs,
3576-
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
3577-
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
3578-
})
3542+
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3543+
self.pending_monitor_updates.push(monitor_update);
3544+
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
35793545
} else {
35803546
log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
3581-
Ok(RAAUpdates {
3582-
commitment_update: None,
3583-
finalized_claimed_htlcs,
3584-
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
3585-
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
3586-
})
3547+
self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3548+
self.pending_monitor_updates.push(monitor_update);
3549+
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
35873550
}
35883551
}
35893552
}
@@ -3763,6 +3726,7 @@ impl<Signer: Sign> Channel<Signer> {
37633726
pub fn monitor_updating_restored<L: Deref>(&mut self, logger: &L, node_pk: PublicKey, genesis_block_hash: BlockHash, best_block_height: u32) -> MonitorRestoreUpdates where L::Target: Logger {
37643727
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
37653728
self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
3729+
self.pending_monitor_updates.clear();
37663730

37673731
// If we're past (or at) the FundingSent stage on an outbound channel, try to
37683732
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we

0 commit comments

Comments
 (0)