Skip to content

Pass MonitorUpdates by ref and tweak manager lockorder #1957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,15 @@ impl chain::Watch<EnforcingSigner> for TestChainMonitor {
self.chain_monitor.watch_channel(funding_txo, monitor)
}

fn update_channel(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
let mut map_lock = self.latest_monitors.lock().unwrap();
let mut map_entry = match map_lock.entry(funding_txo) {
hash_map::Entry::Occupied(entry) => entry,
hash_map::Entry::Vacant(_) => panic!("Didn't have monitor on update call"),
};
let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::
read(&mut Cursor::new(&map_entry.get().1), (&*self.keys, &*self.keys)).unwrap().1;
deserialized_monitor.update_monitor(&update, &&TestBroadcaster{}, &FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, &self.logger).unwrap();
deserialized_monitor.update_monitor(update, &&TestBroadcaster{}, &FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, &self.logger).unwrap();
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
map_entry.insert((update.update_id, ser.0));
Expand Down
2 changes: 1 addition & 1 deletion fuzz/src/utils/test_persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl chainmonitor::Persist<EnforcingSigner> for TestPersister {
self.update_ret.lock().unwrap().clone()
}

fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &Option<channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
self.update_ret.lock().unwrap().clone()
}
}
12 changes: 6 additions & 6 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub trait Persist<ChannelSigner: Sign> {
/// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
///
/// [`Writeable::write`]: crate::util::ser::Writeable::write
fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option<ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
}

struct MonitorHolder<ChannelSigner: Sign> {
Expand Down Expand Up @@ -294,7 +294,7 @@ where C::Target: chain::Filter,
}

log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
ChannelMonitorUpdateStatus::Completed =>
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
ChannelMonitorUpdateStatus::PermanentFailure => {
Expand Down Expand Up @@ -646,7 +646,7 @@ where C::Target: chain::Filter,

/// Note that we persist the given `ChannelMonitor` update while holding the
/// `ChainMonitor` monitors lock.
fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
// Update the monitor that watches the channel referred to by the given outpoint.
let monitors = self.monitors.read().unwrap();
match monitors.get(&funding_txo) {
Expand All @@ -664,15 +664,15 @@ where C::Target: chain::Filter,
Some(monitor_state) => {
let monitor = &monitor_state.monitor;
log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_res = monitor.update_monitor(&update, &self.broadcaster, &*self.fee_estimator, &self.logger);
let update_res = monitor.update_monitor(update, &self.broadcaster, &*self.fee_estimator, &self.logger);
if update_res.is_err() {
log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor));
}
// Even if updating the monitor returns an error, the monitor's state will
// still be changed. So, persist the updated monitor despite the error.
let update_id = MonitorUpdateId::from_monitor_update(&update);
let update_id = MonitorUpdateId::from_monitor_update(update);
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id);
let persist_res = self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id);
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
pending_monitor_updates.push(update_id);
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ pub trait Watch<ChannelSigner: Sign> {
/// [`ChannelMonitorUpdateStatus`] for invariants around returning an error.
///
/// [`update_monitor`]: channelmonitor::ChannelMonitor::update_monitor
fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus;
fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus;

/// Returns any monitor events since the last call. Subsequent calls must only return new
/// events.
Expand Down
4 changes: 2 additions & 2 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ fn test_monitor_and_persister_update_fail() {
// Check that even though the persister is returning a InProgress,
// because the update is bogus, ultimately the error that's returned
// should be a PermanentFailure.
if let ChannelMonitorUpdateStatus::PermanentFailure = chain_mon.chain_monitor.update_channel(outpoint, update.clone()) {} else { panic!("Expected monitor error to be permanent"); }
if let ChannelMonitorUpdateStatus::PermanentFailure = chain_mon.chain_monitor.update_channel(outpoint, &update) {} else { panic!("Expected monitor error to be permanent"); }
logger.assert_log_regex("lightning::chain::chainmonitor".to_string(), regex::Regex::new("Persistence of ChannelMonitorUpdate for channel [0-9a-f]* in progress").unwrap(), 1);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
} else { assert!(false); }
}

Expand Down
46 changes: 25 additions & 21 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,13 +577,13 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = C
// | |
// | |__`pending_intercepted_htlcs`
// |
// |__`pending_inbound_payments`
// |__`per_peer_state`
// | |
// | |__`claimable_payments`
// | |
// | |__`pending_outbound_payments` // This field's struct contains a map of pending outbounds
// | |__`pending_inbound_payments`
// | |
// | |__`claimable_payments`
// | |
// | |__`per_peer_state`
// | |__`pending_outbound_payments` // This field's struct contains a map of pending outbounds
// | |
// | |__`peer_state`
// | |
Expand Down Expand Up @@ -1709,7 +1709,7 @@ where

// Update the monitor with the shutdown script if necessary.
if let Some(monitor_update) = monitor_update {
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update);
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
let (result, is_permanent) =
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
if is_permanent {
Expand Down Expand Up @@ -1807,7 +1807,7 @@ where
// force-closing. The monitor update on the required in-memory copy should broadcast
// the latest local state, which is the best we can do anyway. Thus, it is safe to
// ignore the result here.
let _ = self.chain_monitor.update_channel(funding_txo, monitor_update);
let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update);
}
}

Expand Down Expand Up @@ -2336,7 +2336,7 @@ where
chan)
} {
Some((update_add, commitment_signed, monitor_update)) => {
let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update);
let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
let chan_id = chan.get().channel_id();
match (update_err,
handle_monitor_update_res!(self, update_err, chan,
Expand Down Expand Up @@ -3284,7 +3284,7 @@ where
BackgroundEvent::ClosingMonitorUpdate((funding_txo, update)) => {
// The channel has already been closed, so no use bothering to care about the
// monitor updating completing.
let _ = self.chain_monitor.update_channel(funding_txo, update);
let _ = self.chain_monitor.update_channel(funding_txo, &update);
},
}
}
Expand Down Expand Up @@ -3570,9 +3570,12 @@ where
// Ensure that no peer state channel storage lock is not held when calling this
// function.
// This ensures that future code doesn't introduce a lock_order requirement for
// `forward_htlcs` to be locked after the `per_peer_state` locks, which calling this
// function with the `per_peer_state` aquired would.
assert!(self.per_peer_state.try_write().is_ok());
// `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling
// this function with any `per_peer_state` peer lock aquired would.
let per_peer_state = self.per_peer_state.read().unwrap();
for (_, peer) in per_peer_state.iter() {
assert!(peer.try_lock().is_ok());
}
}

//TODO: There is a timing attack here where if a node fails an HTLC back to us they can
Expand Down Expand Up @@ -3807,7 +3810,7 @@ where
match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
Ok(msgs_monitor_option) => {
if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
ChannelMonitorUpdateStatus::Completed => {},
e => {
log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
Expand Down Expand Up @@ -3844,7 +3847,7 @@ where
}
},
Err((e, monitor_update)) => {
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
ChannelMonitorUpdateStatus::Completed => {},
e => {
// TODO: This needs to be handled somehow - if we receive a monitor update
Expand Down Expand Up @@ -3880,7 +3883,7 @@ where
};
// We update the ChannelMonitor on the backward link, after
// receiving an `update_fulfill_htlc` from the forward link.
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, preimage_update);
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
if update_res != ChannelMonitorUpdateStatus::Completed {
// TODO: This needs to be handled somehow - if we receive a monitor update
// with a preimage we *must* somehow manage to propagate it to the upstream
Expand Down Expand Up @@ -4449,7 +4452,7 @@ where

// Update the monitor with the shutdown script if necessary.
if let Some(monitor_update) = monitor_update {
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update);
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
let (result, is_permanent) =
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
if is_permanent {
Expand Down Expand Up @@ -4650,13 +4653,13 @@ where
Err((None, e)) => try_chan_entry!(self, Err(e), chan),
Err((Some(update), e)) => {
assert!(chan.get().is_awaiting_monitor_update());
let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), update);
let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &update);
try_chan_entry!(self, Err(e), chan);
unreachable!();
},
Ok(res) => res
};
let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update);
let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
if let Err(e) = handle_monitor_update_res!(self, update_res, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()) {
return Err(e);
}
Expand Down Expand Up @@ -4792,7 +4795,7 @@ where
let raa_updates = break_chan_entry!(self,
chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
htlcs_to_fail = raa_updates.holding_cell_failed_htlcs;
let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), raa_updates.monitor_update);
let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &raa_updates.monitor_update);
if was_paused_for_mon_update {
assert!(update_res != ChannelMonitorUpdateStatus::Completed);
assert!(raa_updates.commitment_update.is_none());
Expand Down Expand Up @@ -5097,7 +5100,7 @@ where
));
}
if let Some((commitment_update, monitor_update)) = commitment_opt {
match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) {
ChannelMonitorUpdateStatus::Completed => {
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
node_id: chan.get_counterparty_node_id(),
Expand Down Expand Up @@ -6739,6 +6742,8 @@ where
}
}

let per_peer_state = self.per_peer_state.write().unwrap();

let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap();
let claimable_payments = self.claimable_payments.lock().unwrap();
let pending_outbound_payments = self.pending_outbound_payments.pending_outbound_payments.lock().unwrap();
Expand All @@ -6754,7 +6759,6 @@ where
htlc_purposes.push(purpose);
}

let per_peer_state = self.per_peer_state.write().unwrap();
(per_peer_state.len() as u64).write(writer)?;
for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() {
peer_pubkey.write(writer)?;
Expand Down
10 changes: 5 additions & 5 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8135,8 +8135,8 @@ fn test_update_err_monitor_lockdown() {
let mut node_0_peer_state_lock;
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2);
if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
assert_eq!(watchtower.chain_monitor.update_channel(outpoint, update.clone()), ChannelMonitorUpdateStatus::PermanentFailure);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(watchtower.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
} else { assert!(false); }
}
// Our local monitor is in-sync and hasn't processed yet timeout
Expand Down Expand Up @@ -8230,9 +8230,9 @@ fn test_concurrent_monitor_claim() {
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2);
if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
// Watchtower Alice should already have seen the block and reject the update
assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, update.clone()), ChannelMonitorUpdateStatus::PermanentFailure);
assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, update.clone()), ChannelMonitorUpdateStatus::Completed);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure);
assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed);
} else { assert!(false); }
}
// Our local monitor is in-sync and hasn't processed yet timeout
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl<ChannelSigner: Sign, K: KVStorePersister> Persist<ChannelSigner> for K {
}
}

fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option<ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
match self.persist(&key, monitor) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
Expand Down
8 changes: 4 additions & 4 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ impl<'a> chain::Watch<EnforcingSigner> for TestChainMonitor<'a> {
self.chain_monitor.watch_channel(funding_txo, new_monitor)
}

fn update_channel(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
// Every monitor update should survive roundtrip
let mut w = TestVecWriter(Vec::new());
update.write(&mut w).unwrap();
assert!(channelmonitor::ChannelMonitorUpdate::read(
&mut io::Cursor::new(&w.0)).unwrap() == update);
&mut io::Cursor::new(&w.0)).unwrap() == *update);

self.monitor_updates.lock().unwrap().entry(funding_txo.to_channel_id()).or_insert(Vec::new()).push(update.clone());

Expand All @@ -202,7 +202,7 @@ impl<'a> chain::Watch<EnforcingSigner> for TestChainMonitor<'a> {
}

self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(),
(funding_txo, update.update_id, MonitorUpdateId::from_monitor_update(&update)));
(funding_txo, update.update_id, MonitorUpdateId::from_monitor_update(update)));
let update_res = self.chain_monitor.update_channel(funding_txo, update);
// At every point where we get a monitor update, we should be able to send a useful monitor
// to a watchtower and disk...
Expand Down Expand Up @@ -254,7 +254,7 @@ impl<Signer: keysinterface::Sign> chainmonitor::Persist<Signer> for TestPersiste
chain::ChannelMonitorUpdateStatus::Completed
}

fn update_persisted_channel(&self, funding_txo: OutPoint, update: &Option<channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<Signer>, update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
fn update_persisted_channel(&self, funding_txo: OutPoint, update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<Signer>, update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
let mut ret = chain::ChannelMonitorUpdateStatus::Completed;
if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() {
ret = update_ret;
Expand Down