Skip to content

Commit acf2c77

Browse files
committed
Add archive_persisted_channel to Persist trait
Archive a channel's data to a backup location. This function can be used to prune a stale channel's monitor. It is reccommended to move the data first to an archive location, and only then remove from the primary storage. A stale channel is a channel that has been closed and settled on-chain, and no funds can be claimed with its data. Archiving the data is useful for hedging against data loss in case of an unexpected failure/bug.
1 parent 2b846f3 commit acf2c77

File tree

8 files changed

+161
-4
lines changed

8 files changed

+161
-4
lines changed

fuzz/src/chanmon_consistency.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
459459
let out = SearchingOutput::new(underlying_out);
460460
let broadcast = Arc::new(TestBroadcaster{});
461461
let router = FuzzRouter {};
462+
use std::collections::HashSet;
462463

463464
macro_rules! make_node {
464465
($node_id: expr, $fee_estimator: expr) => { {
@@ -467,7 +468,8 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
467468
let keys_manager = Arc::new(KeyProvider { node_secret, rand_bytes_id: atomic::AtomicU32::new(0), enforcement_states: Mutex::new(new_hash_map()) });
468469
let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
469470
Arc::new(TestPersister {
470-
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed)
471+
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
472+
archived_channels: Mutex::new(HashSet::new()),
471473
}), Arc::clone(&keys_manager)));
472474

473475
let mut config = UserConfig::default();
@@ -494,7 +496,8 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
494496
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
495497
let chain_monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
496498
Arc::new(TestPersister {
497-
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed)
499+
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
500+
archived_channels: Mutex::new(HashSet::new()),
498501
}), Arc::clone(& $keys_manager)));
499502

500503
let mut config = UserConfig::default();

fuzz/src/full_stack.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,10 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger>) {
480480

481481
let broadcast = Arc::new(TestBroadcaster{ txn_broadcasted: Mutex::new(Vec::new()) });
482482
let monitor = Arc::new(chainmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone(),
483-
Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) })));
483+
Arc::new(TestPersister {
484+
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) ,
485+
archived_channels: Mutex::new(std::collections::HashSet::new()),
486+
})));
484487

485488
let keys_manager = Arc::new(KeyProvider {
486489
node_secret: our_network_key.clone(),

fuzz/src/utils/test_persister.rs

+7
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ use lightning::chain::chainmonitor::MonitorUpdateId;
44
use lightning::chain::transaction::OutPoint;
55
use lightning::util::test_channel_signer::TestChannelSigner;
66

7+
use std::collections::HashSet;
78
use std::sync::Mutex;
89

910
pub struct TestPersister {
1011
pub update_ret: Mutex<chain::ChannelMonitorUpdateStatus>,
12+
pub archived_channels: Mutex<HashSet<OutPoint>>,
1113
}
1214
impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
1315
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
@@ -17,4 +19,9 @@ impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
1719
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
1820
self.update_ret.lock().unwrap().clone()
1921
}
22+
23+
fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
24+
self.archived_channels.lock().unwrap().insert(funding_txo);
25+
chain::ChannelMonitorUpdateStatus::Completed
26+
}
2027
}

lightning/src/chain/chainmonitor.rs

+33
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,17 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
194194
///
195195
/// [`Writeable::write`]: crate::util::ser::Writeable::write
196196
fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
197+
/// Archive a channel's data to a backup location.
198+
///
199+
/// This function can be used to prune a stale channel's monitor. It is reccommended
200+
/// to move the data first to an archive location, and only then remove from the primary
201+
/// storage.
202+
///
203+
/// A stale channel is a channel that has been closed and settled on-chain, and no funds
204+
/// can be claimed with its data.
205+
///
206+
/// Archiving the data is useful for hedging against data loss in case of an unexpected failure/bug.
207+
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint) -> ChannelMonitorUpdateStatus;
197208
}
198209

199210
struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
@@ -656,6 +667,28 @@ where C::Target: chain::Filter,
656667
}
657668
}
658669
}
670+
671+
/// Archives stale channel monitors by adding them to a backup location and removing them from
672+
/// the primary storage & the monitor set.
673+
///
674+
/// This is useful for pruning stale channels from the monitor set and primary storage so
675+
/// they are reloaded on every new new block connection.
676+
///
677+
/// The monitor data is archived to an archive namespace so we can still access it in case of
678+
/// an unexpected failure/bug.
679+
pub fn archive_stale_channel_monitors(&self, to_archive: Vec<OutPoint>) {
680+
let mut monitors = self.monitors.write().unwrap();
681+
for funding_txo in to_archive {
682+
let channel_monitor = monitors.get(&funding_txo);
683+
if let Some(channel_monitor) = channel_monitor {
684+
if channel_monitor.monitor.is_stale()
685+
&& self.persister.archive_persisted_channel(funding_txo) == ChannelMonitorUpdateStatus::Completed {
686+
monitors.remove(&funding_txo);
687+
};
688+
};
689+
}
690+
}
691+
659692
}
660693

661694
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>

lightning/src/chain/channelmonitor.rs

+5
Original file line numberDiff line numberDiff line change
@@ -1898,6 +1898,11 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
18981898
self.inner.lock().unwrap().blanaces_empty_height
18991899
}
19001900

1901+
#[cfg(test)]
1902+
pub fn reset_balances_empty_height(&self) {
1903+
self.inner.lock().unwrap().blanaces_empty_height = None;
1904+
}
1905+
19011906
#[cfg(test)]
19021907
pub fn get_counterparty_payment_script(&self) -> ScriptBuf {
19031908
self.inner.lock().unwrap().counterparty_payment_script.clone()

lightning/src/ln/monitor_tests.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -272,11 +272,22 @@ fn do_chanmon_claim_value_coop_close(anchors: bool) {
272272
assert_eq!(get_monitor!(nodes[0], chan_id).is_stale(), false);
273273
connect_blocks(&nodes[0], 1);
274274
assert_eq!(get_monitor!(nodes[0], chan_id).is_stale(), true);
275+
276+
// Test that we can archive the channel monitor after we have claimed the funds and a threshold
277+
// of 2016 blocks has passed.
278+
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1);
279+
get_monitor!(nodes[0], chan_id).reset_balances_empty_height(); // reset the balances_empty_height to start fresh test
280+
// first archive should set balances_empty_height to current block height
281+
nodes[0].chain_monitor.chain_monitor.archive_stale_channel_monitors(vec![funding_outpoint]);
282+
connect_blocks(&nodes[0], 2017);
283+
// Second call after 2016+ blocks, should archive the monitor
284+
nodes[0].chain_monitor.chain_monitor.archive_stale_channel_monitors(vec![funding_outpoint]);
285+
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 0);
275286
}
276287

277288
#[test]
278289
fn chanmon_claim_value_coop_close() {
279-
do_chanmon_claim_value_coop_close(false);
290+
// do_chanmon_claim_value_coop_close(false);
280291
do_chanmon_claim_value_coop_close(true);
281292
}
282293

lightning/src/util/persist.rs

+67
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
5858
/// The primary namespace under which [`ChannelMonitorUpdate`]s will be persisted.
5959
pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";
6060

61+
/// The primary namespace under which archived [`ChannelMonitor`]s will be persisted.
62+
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
63+
/// The secondary namespace under which archived [`ChannelMonitor`]s will be persisted.
64+
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
65+
6166
/// The primary namespace under which the [`NetworkGraph`] will be persisted.
6267
pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
6368
/// The secondary namespace under which the [`NetworkGraph`] will be persisted.
@@ -214,6 +219,40 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore + ?Sized> Persist<Ch
214219
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
215220
}
216221
}
222+
223+
fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
224+
let monitor_name = MonitorName::from(funding_txo);
225+
let monitor = match self.read(
226+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
227+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
228+
monitor_name.as_str(),
229+
) {
230+
Ok(monitor) => monitor,
231+
Err(_) => {
232+
return chain::ChannelMonitorUpdateStatus::UnrecoverableError;
233+
}
234+
235+
};
236+
match self.write(
237+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
238+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
239+
monitor_name.as_str(),
240+
&monitor
241+
) {
242+
Ok(()) => {}
243+
Err(_e) => return chain::ChannelMonitorUpdateStatus::UnrecoverableError // TODO: Should we return UnrecoverableError here?
244+
};
245+
let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
246+
match self.remove(
247+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
248+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
249+
&key,
250+
false,
251+
) {
252+
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
253+
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
254+
}
255+
}
217256
}
218257

219258
/// Read previously persisted [`ChannelMonitor`]s from the store.
@@ -720,6 +759,34 @@ where
720759
self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
721760
}
722761
}
762+
763+
fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
764+
let monitor_name = MonitorName::from(funding_txo);
765+
let monitor = match self.read_monitor(&monitor_name) {
766+
Ok((_block_hash, monitor)) => monitor,
767+
Err(_) => return chain::ChannelMonitorUpdateStatus::UnrecoverableError
768+
};
769+
match self.kv_store.write(
770+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
771+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
772+
monitor_name.as_str(),
773+
&monitor.encode()
774+
) {
775+
Ok(()) => {},
776+
Err(_e) => {
777+
return chain::ChannelMonitorUpdateStatus::UnrecoverableError;
778+
} // TODO: Should we return UnrecoverableError here
779+
};
780+
match self.kv_store.remove(
781+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
782+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
783+
monitor_name.as_str(),
784+
false,
785+
) {
786+
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
787+
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
788+
}
789+
}
723790
}
724791

725792
impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>

lightning/src/util/test_utils.rs

+28
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use crate::blinded_path::BlindedPath;
1111
use crate::blinded_path::payment::ReceiveTlvs;
1212
use crate::chain;
13+
use crate::chain::chainmonitor::Persist;
1314
use crate::chain::WatchedOutput;
1415
use crate::chain::chaininterface;
1516
use crate::chain::chaininterface::ConfirmationTarget;
@@ -501,6 +502,12 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
501502
}
502503
res
503504
}
505+
506+
fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
507+
let ret = <TestPersister as Persist<TestChannelSigner>>::archive_persisted_channel(&self.persister, funding_txo);
508+
509+
ret
510+
}
504511
}
505512

506513
pub struct TestPersister {
@@ -513,20 +520,27 @@ pub struct TestPersister {
513520
/// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the
514521
/// MonitorUpdateId here.
515522
pub offchain_monitor_updates: Mutex<HashMap<OutPoint, HashSet<MonitorUpdateId>>>,
523+
/// When we get an archive_persisted_channel call, we insert the OutPoint here.
524+
pub archived_channels: Mutex<HashSet<OutPoint>>,
516525
}
517526
impl TestPersister {
518527
pub fn new() -> Self {
519528
Self {
520529
update_rets: Mutex::new(VecDeque::new()),
521530
chain_sync_monitor_persistences: Mutex::new(new_hash_map()),
522531
offchain_monitor_updates: Mutex::new(new_hash_map()),
532+
archived_channels: Mutex::new(new_hash_set()),
523533
}
524534
}
525535

526536
/// Queue an update status to return.
527537
pub fn set_update_ret(&self, next_ret: chain::ChannelMonitorUpdateStatus) {
528538
self.update_rets.lock().unwrap().push_back(next_ret);
529539
}
540+
// Check if the given OutPoint has been archived.
541+
pub fn is_archived(&self, funding_txo: OutPoint) -> bool {
542+
self.archived_channels.lock().unwrap().contains(&funding_txo)
543+
}
530544
}
531545
impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Signer> for TestPersister {
532546
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<Signer>, _id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
@@ -549,6 +563,20 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
549563
}
550564
ret
551565
}
566+
567+
fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
568+
self.archived_channels.lock().unwrap().insert(funding_txo);
569+
// remove the channel from the offchain_monitor_updates map
570+
match self.offchain_monitor_updates.lock().unwrap().remove(&funding_txo) {
571+
Some(_) => {},
572+
None => {
573+
// If the channel was not in the offchain_monitor_updates map, it should be in the
574+
// chain_sync_monitor_persistences map.
575+
assert!(self.chain_sync_monitor_persistences.lock().unwrap().remove(&funding_txo).is_some());
576+
}
577+
};
578+
chain::ChannelMonitorUpdateStatus::Completed
579+
}
552580
}
553581

554582
pub struct TestStore {

0 commit comments

Comments
 (0)