Skip to content

Commit 08233e8

Browse files
committed
(XXX: Tests fail) Always process RAA/CS ChannelMonitorUpdates asynchronously
We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline for handling inbound `revoke_and_ack` and `commitment_signed` messages.
1 parent 40f2d44 commit 08233e8

File tree

5 files changed

+66
-181
lines changed

5 files changed

+66
-181
lines changed

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ fn test_monitor_and_persister_update_fail() {
143143
let mut node_0_per_peer_lock;
144144
let mut node_0_peer_state_lock;
145145
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2);
146-
if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
146+
if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
147147
// Check that even though the persister is returning a InProgress,
148148
// because the update is bogus, ultimately the error that's returned
149149
// should be a PermanentFailure.
@@ -1739,7 +1739,6 @@ fn test_monitor_update_on_pending_forwards() {
17391739
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
17401740
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_2.2 }]);
17411741
check_added_monitors!(nodes[1], 1);
1742-
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
17431742

17441743
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
17451744
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
@@ -1753,11 +1752,11 @@ fn test_monitor_update_on_pending_forwards() {
17531752

17541753
let events = nodes[0].node.get_and_clear_pending_events();
17551754
assert_eq!(events.len(), 2);
1756-
if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[0] {
1755+
if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[1] {
17571756
assert_eq!(payment_hash, payment_hash_1);
17581757
assert!(payment_failed_permanently);
17591758
} else { panic!("Unexpected event!"); }
1760-
match events[1] {
1759+
match events[0] {
17611760
Event::PendingHTLCsForwardable { .. } => { },
17621761
_ => panic!("Unexpected event"),
17631762
};

lightning/src/ln/channel.rs

Lines changed: 40 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -410,17 +410,6 @@ pub enum UpdateFulfillCommitFetch {
410410
DuplicateClaim {},
411411
}
412412

413-
/// The return value of `revoke_and_ack` on success, primarily updates to other channels or HTLC
414-
/// state.
415-
pub(super) struct RAAUpdates {
416-
pub commitment_update: Option<msgs::CommitmentUpdate>,
417-
pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
418-
pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
419-
pub finalized_claimed_htlcs: Vec<HTLCSource>,
420-
pub monitor_update: ChannelMonitorUpdate,
421-
pub holding_cell_failed_htlcs: Vec<(HTLCSource, PaymentHash)>,
422-
}
423-
424413
/// The return value of `monitor_updating_restored`
425414
pub(super) struct MonitorRestoreUpdates {
426415
pub raa: Option<msgs::RevokeAndACK>,
@@ -3003,17 +2992,17 @@ impl<Signer: Sign> Channel<Signer> {
30032992
Ok(())
30042993
}
30052994

3006-
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>, ChannelMonitorUpdate), (Option<ChannelMonitorUpdate>, ChannelError)>
2995+
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError>
30072996
where L::Target: Logger
30082997
{
30092998
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
3010-
return Err((None, ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned())));
2999+
return Err(ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned()));
30113000
}
30123001
if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
3013-
return Err((None, ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned())));
3002+
return Err(ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned()));
30143003
}
30153004
if self.channel_state & BOTH_SIDES_SHUTDOWN_MASK == BOTH_SIDES_SHUTDOWN_MASK && self.last_sent_closing_fee.is_some() {
3016-
return Err((None, ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned())));
3005+
return Err(ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned()));
30173006
}
30183007

30193008
let funding_script = self.get_funding_redeemscript();
@@ -3031,7 +3020,7 @@ impl<Signer: Sign> Channel<Signer> {
30313020
log_bytes!(self.counterparty_funding_pubkey().serialize()), encode::serialize_hex(&bitcoin_tx.transaction),
30323021
log_bytes!(sighash[..]), encode::serialize_hex(&funding_script), log_bytes!(self.channel_id()));
30333022
if let Err(_) = self.secp_ctx.verify_ecdsa(&sighash, &msg.signature, &self.counterparty_funding_pubkey()) {
3034-
return Err((None, ChannelError::Close("Invalid commitment tx signature from peer".to_owned())));
3023+
return Err(ChannelError::Close("Invalid commitment tx signature from peer".to_owned()));
30353024
}
30363025
bitcoin_tx.txid
30373026
};
@@ -3046,7 +3035,7 @@ impl<Signer: Sign> Channel<Signer> {
30463035
debug_assert!(!self.is_outbound());
30473036
let counterparty_reserve_we_require_msat = self.holder_selected_channel_reserve_satoshis * 1000;
30483037
if commitment_stats.remote_balance_msat < commitment_stats.total_fee_sat * 1000 + counterparty_reserve_we_require_msat {
3049-
return Err((None, ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned())));
3038+
return Err(ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned()));
30503039
}
30513040
}
30523041
#[cfg(any(test, fuzzing))]
@@ -3068,7 +3057,7 @@ impl<Signer: Sign> Channel<Signer> {
30683057
}
30693058

30703059
if msg.htlc_signatures.len() != commitment_stats.num_nondust_htlcs {
3071-
return Err((None, ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs))));
3060+
return Err(ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs)));
30723061
}
30733062

30743063
// TODO: Sadly, we pass HTLCs twice to ChannelMonitor: once via the HolderCommitmentTransaction and once via the update
@@ -3086,7 +3075,7 @@ impl<Signer: Sign> Channel<Signer> {
30863075
log_bytes!(msg.htlc_signatures[idx].serialize_compact()[..]), log_bytes!(keys.countersignatory_htlc_key.serialize()),
30873076
encode::serialize_hex(&htlc_tx), log_bytes!(htlc_sighash[..]), encode::serialize_hex(&htlc_redeemscript), log_bytes!(self.channel_id()));
30883077
if let Err(_) = self.secp_ctx.verify_ecdsa(&htlc_sighash, &msg.htlc_signatures[idx], &keys.countersignatory_htlc_key) {
3089-
return Err((None, ChannelError::Close("Invalid HTLC tx signature from peer".to_owned())));
3078+
return Err(ChannelError::Close("Invalid HTLC tx signature from peer".to_owned()));
30903079
}
30913080
htlcs_and_sigs.push((htlc, Some(msg.htlc_signatures[idx]), source));
30923081
} else {
@@ -3102,10 +3091,8 @@ impl<Signer: Sign> Channel<Signer> {
31023091
self.counterparty_funding_pubkey()
31033092
);
31043093

3105-
let next_per_commitment_point = self.holder_signer.get_per_commitment_point(self.cur_holder_commitment_transaction_number - 1, &self.secp_ctx);
31063094
self.holder_signer.validate_holder_commitment(&holder_commitment_tx, commitment_stats.preimages)
3107-
.map_err(|_| (None, ChannelError::Close("Failed to validate our commitment".to_owned())))?;
3108-
let per_commitment_secret = self.holder_signer.release_commitment_secret(self.cur_holder_commitment_transaction_number + 1);
3095+
.map_err(|_| ChannelError::Close("Failed to validate our commitment".to_owned()))?;
31093096

31103097
// Update state now that we've passed all the can-fail calls...
31113098
let mut need_commitment = false;
@@ -3150,7 +3137,7 @@ impl<Signer: Sign> Channel<Signer> {
31503137

31513138
self.cur_holder_commitment_transaction_number -= 1;
31523139
// Note that if we need_commitment & !AwaitingRemoteRevoke we'll call
3153-
// send_commitment_no_status_check() next which will reset this to RAAFirst.
3140+
// build_commitment_no_status_check() next which will reset this to RAAFirst.
31543141
self.resend_order = RAACommitmentOrder::CommitmentFirst;
31553142

31563143
if (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 {
@@ -3162,37 +3149,35 @@ impl<Signer: Sign> Channel<Signer> {
31623149
// the corresponding HTLC status updates so that get_last_commitment_update
31633150
// includes the right HTLCs.
31643151
self.monitor_pending_commitment_signed = true;
3165-
let (_, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
3166-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3152+
let mut additional_update = self.build_commitment_no_status_check(logger);
3153+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
31673154
// strictly increasing by one, so decrement it here.
31683155
self.latest_monitor_update_id = monitor_update.update_id;
31693156
monitor_update.updates.append(&mut additional_update.updates);
31703157
}
31713158
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.",
31723159
log_bytes!(self.channel_id));
3173-
return Err((Some(monitor_update), ChannelError::Ignore("Previous monitor update failure prevented generation of RAA".to_owned())));
3160+
self.pending_monitor_updates.push(monitor_update);
3161+
return Ok(self.pending_monitor_updates.last().unwrap());
31743162
}
31753163

3176-
let commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
3164+
let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
31773165
// If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok -
31783166
// we'll send one right away when we get the revoke_and_ack when we
31793167
// free_holding_cell_htlcs().
3180-
let (msg, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
3181-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3168+
let mut additional_update = self.build_commitment_no_status_check(logger);
3169+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
31823170
// strictly increasing by one, so decrement it here.
31833171
self.latest_monitor_update_id = monitor_update.update_id;
31843172
monitor_update.updates.append(&mut additional_update.updates);
3185-
Some(msg)
3186-
} else { None };
3173+
true
3174+
} else { false };
31873175

31883176
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.",
3189-
log_bytes!(self.channel_id()), if commitment_signed.is_some() { " our own commitment_signed and" } else { "" });
3190-
3191-
Ok((msgs::RevokeAndACK {
3192-
channel_id: self.channel_id,
3193-
per_commitment_secret,
3194-
next_per_commitment_point,
3195-
}, commitment_signed, monitor_update))
3177+
log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" });
3178+
self.pending_monitor_updates.push(monitor_update);
3179+
self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new());
3180+
return Ok(self.pending_monitor_updates.last().unwrap());
31963181
}
31973182

31983183
/// Public version of the below, checking relevant preconditions first.
@@ -3324,7 +3309,7 @@ impl<Signer: Sign> Channel<Signer> {
33243309
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
33253310
/// generating an appropriate error *after* the channel state has been updated based on the
33263311
/// revoke_and_ack message.
3327-
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<RAAUpdates, ChannelError>
3312+
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError>
33283313
where L::Target: Logger,
33293314
{
33303315
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3511,8 +3496,8 @@ impl<Signer: Sign> Channel<Signer> {
35113496
// When the monitor updating is restored we'll call get_last_commitment_update(),
35123497
// which does not update state, but we're definitely now awaiting a remote revoke
35133498
// before we can step forward any more, so set it here.
3514-
let (_, mut additional_update) = self.send_commitment_no_status_check(logger)?;
3515-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3499+
let mut additional_update = self.build_commitment_no_status_check(logger);
3500+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
35163501
// strictly increasing by one, so decrement it here.
35173502
self.latest_monitor_update_id = monitor_update.update_id;
35183503
monitor_update.updates.append(&mut additional_update.updates);
@@ -3521,12 +3506,8 @@ impl<Signer: Sign> Channel<Signer> {
35213506
self.monitor_pending_failures.append(&mut revoked_htlcs);
35223507
self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
35233508
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
3524-
return Ok(RAAUpdates {
3525-
commitment_update: None, finalized_claimed_htlcs: Vec::new(),
3526-
accepted_htlcs: Vec::new(), failed_htlcs: Vec::new(),
3527-
monitor_update,
3528-
holding_cell_failed_htlcs: Vec::new()
3529-
});
3509+
self.pending_monitor_updates.push(monitor_update);
3510+
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
35303511
}
35313512

35323513
match self.free_holding_cell_htlcs(logger)? {
@@ -3545,47 +3526,29 @@ impl<Signer: Sign> Channel<Signer> {
35453526
self.latest_monitor_update_id = monitor_update.update_id;
35463527
monitor_update.updates.append(&mut additional_update.updates);
35473528

3548-
Ok(RAAUpdates {
3549-
commitment_update: Some(commitment_update),
3550-
finalized_claimed_htlcs,
3551-
accepted_htlcs: to_forward_infos,
3552-
failed_htlcs: revoked_htlcs,
3553-
monitor_update,
3554-
holding_cell_failed_htlcs: htlcs_to_fail
3555-
})
3529+
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3530+
self.pending_monitor_updates.push(monitor_update);
3531+
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
35563532
},
35573533
(None, htlcs_to_fail) => {
35583534
if require_commitment {
3559-
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
3535+
let mut additional_update = self.build_commitment_no_status_check(logger);
35603536

3561-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3537+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
35623538
// strictly increasing by one, so decrement it here.
35633539
self.latest_monitor_update_id = monitor_update.update_id;
35643540
monitor_update.updates.append(&mut additional_update.updates);
35653541

35663542
log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
35673543
log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
3568-
Ok(RAAUpdates {
3569-
commitment_update: Some(msgs::CommitmentUpdate {
3570-
update_add_htlcs: Vec::new(),
3571-
update_fulfill_htlcs: Vec::new(),
3572-
update_fail_htlcs,
3573-
update_fail_malformed_htlcs,
3574-
update_fee: None,
3575-
commitment_signed
3576-
}),
3577-
finalized_claimed_htlcs,
3578-
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
3579-
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
3580-
})
3544+
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3545+
self.pending_monitor_updates.push(monitor_update);
3546+
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
35813547
} else {
35823548
log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
3583-
Ok(RAAUpdates {
3584-
commitment_update: None,
3585-
finalized_claimed_htlcs,
3586-
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
3587-
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
3588-
})
3549+
self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3550+
self.pending_monitor_updates.push(monitor_update);
3551+
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
35893552
}
35903553
}
35913554
}
@@ -3765,6 +3728,7 @@ impl<Signer: Sign> Channel<Signer> {
37653728
pub fn monitor_updating_restored<L: Deref>(&mut self, logger: &L, node_pk: PublicKey, genesis_block_hash: BlockHash, user_config: &UserConfig, best_block_height: u32) -> MonitorRestoreUpdates where L::Target: Logger {
37663729
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
37673730
self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
3731+
self.pending_monitor_updates.clear();
37683732

37693733
// If we're past (or at) the FundingSent stage on an outbound channel, try to
37703734
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we

0 commit comments

Comments
 (0)