Skip to content

Stop cleaning monitor updates on new block connect #2779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 82 additions & 89 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,10 @@ where
///
/// # Pruning stale channel updates
///
/// Stale updates are pruned when a full monitor is written. The old monitor is first read, and if
/// that succeeds, updates in the range between the old and new monitors are deleted. The `lazy`
/// flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions
/// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
/// Monitor updates in the range between the latest `update_id` and `update_id - maximum_pending_updates`
/// are deleted.
/// The `lazy` flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions
/// will complete. However, stale updates are not a problem for data integrity, since updates are
/// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`.
///
Expand Down Expand Up @@ -610,24 +611,6 @@ where
) -> chain::ChannelMonitorUpdateStatus {
// Determine the proper key for this monitor
let monitor_name = MonitorName::from(funding_txo);
let maybe_old_monitor = self.read_monitor(&monitor_name);
match maybe_old_monitor {
Ok((_, ref old_monitor)) => {
// Check that this key isn't already storing a monitor with a higher update_id
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this check since we no longer read old_monitor in case of full monitor persist anymore.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think its fine. I'd previously suggested doing it here, but really ChainMonitor should handle it for us.

// (collision)
if old_monitor.get_latest_update_id() > monitor.get_latest_update_id() {
log_error!(
self.logger,
"Tried to write a monitor at the same outpoint {} with a higher update_id!",
monitor_name.as_str()
);
return chain::ChannelMonitorUpdateStatus::UnrecoverableError;
}
}
// This means the channel monitor is new.
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {}
_ => return chain::ChannelMonitorUpdateStatus::UnrecoverableError,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer fail in this case.

}
// Serialize and write the new monitor
let mut monitor_bytes = Vec::with_capacity(
MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(),
Expand All @@ -641,65 +624,12 @@ where
&monitor_bytes,
) {
Ok(_) => {
// Assess cleanup. Typically, we'll clean up only between the last two known full
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved cleanup logic to update_persisted_channel, to only cleanup in case of consolidation due to max_pending_updates

// monitors.
if let Ok((_, old_monitor)) = maybe_old_monitor {
let start = old_monitor.get_latest_update_id();
let end = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
// We don't want to clean the rest of u64, so just do possible pending
// updates. Note that we never write updates at
// `CLOSED_CHANNEL_UPDATE_ID`.
cmp::min(
start.saturating_add(self.maximum_pending_updates),
CLOSED_CHANNEL_UPDATE_ID - 1,
)
} else {
monitor.get_latest_update_id().saturating_sub(1)
};
// We should bother cleaning up only if there's at least one update
// expected.
for update_id in start..=end {
let update_name = UpdateName::from(update_id);
#[cfg(debug_assertions)]
{
if let Ok(update) =
self.read_monitor_update(&monitor_name, &update_name)
{
// Assert that we are reading what we think we are.
debug_assert_eq!(update.update_id, update_name.0);
} else if update_id != start && monitor.get_latest_update_id() != CLOSED_CHANNEL_UPDATE_ID
{
// We're deleting something we should know doesn't exist.
panic!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a possibility to issue some extra deletes since there will be no update persisted in case of block connect, but we will try to cleanup update with that update_id.

"failed to read monitor update {}",
update_name.as_str()
);
}
// On closed channels, we will unavoidably try to read
// non-existent updates since we have to guess at the range of
// stale updates, so do nothing.
}
if let Err(e) = self.kv_store.remove(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
update_name.as_str(),
true,
) {
log_error!(
self.logger,
"error cleaning up channel monitor updates for monitor {}, reason: {}",
monitor_name.as_str(),
e
);
};
}
};
chain::ChannelMonitorUpdateStatus::Completed
}
Err(e) => {
log_error!(
self.logger,
"error writing channel monitor {}/{}/{} reason: {}",
"Failed to write ChannelMonitor {}/{}/{} reason: {}",
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
Expand Down Expand Up @@ -741,7 +671,7 @@ where
Err(e) => {
log_error!(
self.logger,
"error writing channel monitor update {}/{}/{} reason: {}",
"Failed to write ChannelMonitorUpdate {}/{}/{} reason: {}",
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
update_name.as_str(),
Expand All @@ -751,8 +681,41 @@ where
}
}
} else {
// We could write this update, but it meets criteria of our design that call for a full monitor write.
self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
let monitor_name = MonitorName::from(funding_txo);
// In case of channel-close monitor update, we need to read old monitor before persisting
// the new one in order to determine the cleanup range.
let maybe_old_monitor = match monitor.get_latest_update_id() {
CLOSED_CHANNEL_UPDATE_ID => self.read_monitor(&monitor_name).ok(),
_ => None
};

// We could write this update, but it meets criteria of our design that calls for a full monitor write.
let monitor_update_status = self.persist_new_channel(funding_txo, monitor, monitor_update_call_id);

if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status {
let cleanup_range = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
// If there is an error while reading old monitor, we skip clean up.
maybe_old_monitor.map(|(_, ref old_monitor)| {
let start = old_monitor.get_latest_update_id();
// We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
let end = cmp::min(
start.saturating_add(self.maximum_pending_updates),
CLOSED_CHANNEL_UPDATE_ID - 1,
);
(start, end)
})
} else {
let end = monitor.get_latest_update_id();
let start = end.saturating_sub(self.maximum_pending_updates);
Some((start, end))
};

if let Some((start, end)) = cleanup_range {
self.cleanup_in_range(monitor_name, start, end);
}
}

monitor_update_status
}
} else {
// There is no update given, so we must persist a new monitor.
Expand All @@ -761,6 +724,34 @@ where
}
}

impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
where
ES::Target: EntropySource + Sized,
K::Target: KVStore,
L::Target: Logger,
SP::Target: SignerProvider + Sized
{
// Cleans up monitor updates for given monitor in range `start..=end`.
fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking nit: Fine for now, but if we touch this again we may want to change these semantics of cleanup_in_range. I already had brought this up in the original PR, but usually start..end in Rust has the semantics start <= x < end (cf. https://doc.rust-lang.org/std/ops/struct.Range.html). So for a reader it may be a bit unexpected to have this work as start..=end. If we want to keep this version we could at least rename the variables to first/last to be a bit more clear and to show that this is unconventional behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to complicate semantics for calculating cleanup_range by changing end calculation.
Probably makes sense to rename it as last. (But not super critical for now, since it is private function)

for update_id in start..=end {
let update_name = UpdateName::from(update_id);
if let Err(e) = self.kv_store.remove(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
update_name.as_str(),
true,
) {
log_error!(
self.logger,
"Failed to clean up channel monitor updates for monitor {}, reason: {}",
monitor_name.as_str(),
e
);
};
}
}
}

/// A struct representing a name for a monitor.
#[derive(Debug)]
struct MonitorName(String);
Expand Down Expand Up @@ -896,20 +887,21 @@ mod tests {
#[test]
fn persister_with_real_monitors() {
// This value is used later to limit how many iterations we perform.
let test_max_pending_updates = 7;
let persister_0_max_pending_updates = 7;
// Intentionally set this to a smaller value to test a different alignment.
let persister_1_max_pending_updates = 3;
let chanmon_cfgs = create_chanmon_cfgs(4);
let persister_0 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: test_max_pending_updates,
maximum_pending_updates: persister_0_max_pending_updates,
entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager,
};
let persister_1 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
// Intentionally set this to a smaller value to test a different alignment.
maximum_pending_updates: 3,
maximum_pending_updates: persister_1_max_pending_updates,
entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager,
};
Expand All @@ -934,7 +926,6 @@ mod tests {
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster;

Expand All @@ -957,10 +948,11 @@ mod tests {
for (_, mon) in persisted_chan_data_0.iter() {
// check that when we read it, we got the right update id
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
// if the CM is at the correct update id without updates, ensure no updates are stored

// if the CM is at consolidation threshold, ensure no updates are stored.
let monitor_name = MonitorName::from(mon.get_funding_txo().0);
let (_, cm_0) = persister_0.read_monitor(&monitor_name).unwrap();
if cm_0.get_latest_update_id() == $expected_update_id {
if mon.get_latest_update_id() % persister_0_max_pending_updates == 0
|| mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
assert_eq!(
persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()).unwrap().len(),
Expand All @@ -975,8 +967,9 @@ mod tests {
for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
let monitor_name = MonitorName::from(mon.get_funding_txo().0);
let (_, cm_1) = persister_1.read_monitor(&monitor_name).unwrap();
if cm_1.get_latest_update_id() == $expected_update_id {
// if the CM is at consolidation threshold, ensure no updates are stored.
if mon.get_latest_update_id() % persister_1_max_pending_updates == 0
|| mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
assert_eq!(
persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()).unwrap().len(),
Expand All @@ -1001,7 +994,7 @@ mod tests {
// Send a few more payments to try all the alignments of max pending updates with
// updates for a payment sent and received.
let mut sender = 0;
for i in 3..=test_max_pending_updates * 2 {
for i in 3..=persister_0_max_pending_updates * 2 {
let receiver;
if sender == 0 {
sender = 1;
Expand Down