Skip to content

Commit d40ab06

Browse files
committed
Use FilesystemStore in BackgroundProcessor
We switch our BP over to use `FilesystemStore`, which also gives us test coverage and ensures the compatibility.
1 parent 252adae commit d40ab06

File tree

3 files changed

+74
-52
lines changed

3 files changed

+74
-52
lines changed

lightning-background-processor/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,4 @@ lightning-rapid-gossip-sync = { version = "0.0.116", path = "../lightning-rapid-
2929
tokio = { version = "1.14", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }
3030
lightning = { version = "0.0.116", path = "../lightning", features = ["_test_utils"] }
3131
lightning-invoice = { version = "0.24.0", path = "../lightning-invoice" }
32-
lightning-persister = { version = "0.0.116", path = "../lightning-persister" }
32+
lightning-storage = { version = "0.0.116", path = "../lightning-storage" }

lightning-background-processor/src/lib.rs

+58-39
Original file line numberDiff line numberDiff line change
@@ -496,9 +496,17 @@ use core::task;
496496
/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
497497
/// could setup `process_events_async` like this:
498498
/// ```
499-
/// # struct MyPersister {}
500-
/// # impl lightning::util::persist::KVStorePersister for MyPersister {
501-
/// # fn persist<W: lightning::util::ser::Writeable>(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) }
499+
/// # use lightning::io;
500+
/// # use std::sync::{Arc, Mutex};
501+
/// # use std::sync::atomic::{AtomicBool, Ordering};
502+
/// # use lightning_background_processor::{process_events_async, GossipSync};
503+
/// # struct MyStore {}
504+
/// # impl lightning::util::persist::KVStore for MyStore {
505+
/// # type Reader = io::Cursor<Vec<u8>>;
506+
/// # fn read(&self, namespace: &str, key: &str) -> io::Result<Self::Reader> { Ok(io::Cursor::new(Vec::new())) }
507+
/// # fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
508+
/// # fn remove(&self, namespace: &str, key: &str) -> io::Result<()> { Ok(()) }
509+
/// # fn list(&self, namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
502510
/// # }
503511
/// # struct MyEventHandler {}
504512
/// # impl MyEventHandler {
@@ -510,23 +518,20 @@ use core::task;
510518
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
511519
/// # fn disconnect_socket(&mut self) {}
512520
/// # }
513-
/// # use std::sync::{Arc, Mutex};
514-
/// # use std::sync::atomic::{AtomicBool, Ordering};
515-
/// # use lightning_background_processor::{process_events_async, GossipSync};
516521
/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
517522
/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
518523
/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
519524
/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
520525
/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
521526
/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
522-
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyPersister>>;
527+
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
523528
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyUtxoLookup, MyLogger>;
524529
/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
525530
/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
526531
/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
527532
/// # type MyScorer = Mutex<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
528533
///
529-
/// # async fn setup_background_processing(my_persister: Arc<MyPersister>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
534+
/// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
530535
/// let background_persister = Arc::clone(&my_persister);
531536
/// let background_event_handler = Arc::clone(&my_event_handler);
532537
/// let background_chain_mon = Arc::clone(&my_chain_monitor);
@@ -862,8 +867,8 @@ mod tests {
862867
use lightning::util::config::UserConfig;
863868
use lightning::util::ser::Writeable;
864869
use lightning::util::test_utils;
865-
use lightning::util::persist::KVStorePersister;
866-
use lightning_persister::FilesystemPersister;
870+
use lightning::util::persist::{KVStore, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_KEY};
871+
use lightning_storage::fs_store::{FilesystemStore, FilesystemReader};
867872
use std::collections::VecDeque;
868873
use std::{fs, env};
869874
use std::path::PathBuf;
@@ -902,7 +907,7 @@ mod tests {
902907
>,
903908
Arc<test_utils::TestLogger>>;
904909

905-
type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
910+
type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemStore>>;
906911

907912
type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
908913
type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
@@ -913,7 +918,7 @@ mod tests {
913918
rapid_gossip_sync: RGS,
914919
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
915920
chain_monitor: Arc<ChainMonitor>,
916-
persister: Arc<FilesystemPersister>,
921+
kv_store: Arc<FilesystemStore>,
917922
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
918923
network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
919924
logger: Arc<test_utils::TestLogger>,
@@ -937,9 +942,9 @@ mod tests {
937942

938943
impl Drop for Node {
939944
fn drop(&mut self) {
940-
let data_dir = self.persister.get_data_dir();
945+
let data_dir = self.kv_store.get_data_dir();
941946
match fs::remove_dir_all(data_dir.clone()) {
942-
Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
947+
Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir.display(), e),
943948
_ => {}
944949
}
945950
}
@@ -950,13 +955,13 @@ mod tests {
950955
graph_persistence_notifier: Option<SyncSender<()>>,
951956
manager_error: Option<(std::io::ErrorKind, &'static str)>,
952957
scorer_error: Option<(std::io::ErrorKind, &'static str)>,
953-
filesystem_persister: FilesystemPersister,
958+
kv_store: FilesystemStore,
954959
}
955960

956961
impl Persister {
957-
fn new(data_dir: String) -> Self {
958-
let filesystem_persister = FilesystemPersister::new(data_dir);
959-
Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
962+
fn new(data_dir: PathBuf) -> Self {
963+
let kv_store = FilesystemStore::new(data_dir);
964+
Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, kv_store }
960965
}
961966

962967
fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
@@ -976,15 +981,21 @@ mod tests {
976981
}
977982
}
978983

979-
impl KVStorePersister for Persister {
980-
fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
981-
if key == "manager" {
984+
impl KVStore for Persister {
985+
type Reader = FilesystemReader;
986+
987+
fn read(&self, namespace: &str, key: &str) -> lightning::io::Result<Self::Reader> {
988+
self.kv_store.read(namespace, key)
989+
}
990+
991+
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
992+
if namespace == CHANNEL_MANAGER_PERSISTENCE_NAMESPACE && key == CHANNEL_MANAGER_PERSISTENCE_KEY {
982993
if let Some((error, message)) = self.manager_error {
983994
return Err(std::io::Error::new(error, message))
984995
}
985996
}
986997

987-
if key == "network_graph" {
998+
if namespace == NETWORK_GRAPH_PERSISTENCE_NAMESPACE && key == NETWORK_GRAPH_PERSISTENCE_KEY {
988999
if let Some(sender) = &self.graph_persistence_notifier {
9891000
match sender.send(()) {
9901001
Ok(()) => {},
@@ -997,13 +1008,21 @@ mod tests {
9971008
}
9981009
}
9991010

1000-
if key == "scorer" {
1011+
if namespace == SCORER_PERSISTENCE_NAMESPACE && key == SCORER_PERSISTENCE_KEY {
10011012
if let Some((error, message)) = self.scorer_error {
10021013
return Err(std::io::Error::new(error, message))
10031014
}
10041015
}
10051016

1006-
self.filesystem_persister.persist(key, object)
1017+
self.kv_store.write(namespace, key, buf)
1018+
}
1019+
1020+
fn remove(&self, namespace: &str, key: &str) -> lightning::io::Result<()> {
1021+
self.kv_store.remove(namespace, key)
1022+
}
1023+
1024+
fn list(&self, namespace: &str) -> lightning::io::Result<Vec<String>> {
1025+
self.kv_store.list(namespace)
10071026
}
10081027
}
10091028

@@ -1151,10 +1170,10 @@ mod tests {
11511170
let seed = [i as u8; 32];
11521171
let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), ()));
11531172
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
1154-
let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", &persist_dir, i)));
1173+
let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
11551174
let now = Duration::from_secs(genesis_block.header.time as u64);
11561175
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1157-
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
1176+
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone()));
11581177
let best_block = BestBlock::from_network(network);
11591178
let params = ChainParameters { network, best_block };
11601179
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
@@ -1166,7 +1185,7 @@ mod tests {
11661185
onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{}
11671186
};
11681187
let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
1169-
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
1188+
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer };
11701189
nodes.push(node);
11711190
}
11721191

@@ -1261,7 +1280,7 @@ mod tests {
12611280
let tx = open_channel!(nodes[0], nodes[1], 100000);
12621281

12631282
// Initiate the background processors to watch each node.
1264-
let data_dir = nodes[0].persister.get_data_dir();
1283+
let data_dir = nodes[0].kv_store.get_data_dir();
12651284
let persister = Arc::new(Persister::new(data_dir));
12661285
let event_handler = |_: _| {};
12671286
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1326,7 +1345,7 @@ mod tests {
13261345
// `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
13271346
// `PeerManager::timer_tick_occurred` every `PING_TIMER`.
13281347
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
1329-
let data_dir = nodes[0].persister.get_data_dir();
1348+
let data_dir = nodes[0].kv_store.get_data_dir();
13301349
let persister = Arc::new(Persister::new(data_dir));
13311350
let event_handler = |_: _| {};
13321351
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1353,7 +1372,7 @@ mod tests {
13531372
let (_, nodes) = create_nodes(2, "test_persist_error");
13541373
open_channel!(nodes[0], nodes[1], 100000);
13551374

1356-
let data_dir = nodes[0].persister.get_data_dir();
1375+
let data_dir = nodes[0].kv_store.get_data_dir();
13571376
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
13581377
let event_handler = |_: _| {};
13591378
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1373,7 +1392,7 @@ mod tests {
13731392
let (_, nodes) = create_nodes(2, "test_persist_error_sync");
13741393
open_channel!(nodes[0], nodes[1], 100000);
13751394

1376-
let data_dir = nodes[0].persister.get_data_dir();
1395+
let data_dir = nodes[0].kv_store.get_data_dir();
13771396
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
13781397

13791398
let bp_future = super::process_events_async(
@@ -1399,7 +1418,7 @@ mod tests {
13991418
fn test_network_graph_persist_error() {
14001419
// Test that if we encounter an error during network graph persistence, an error gets returned.
14011420
let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
1402-
let data_dir = nodes[0].persister.get_data_dir();
1421+
let data_dir = nodes[0].kv_store.get_data_dir();
14031422
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
14041423
let event_handler = |_: _| {};
14051424
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1417,7 +1436,7 @@ mod tests {
14171436
fn test_scorer_persist_error() {
14181437
// Test that if we encounter an error during scorer persistence, an error gets returned.
14191438
let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
1420-
let data_dir = nodes[0].persister.get_data_dir();
1439+
let data_dir = nodes[0].kv_store.get_data_dir();
14211440
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
14221441
let event_handler = |_: _| {};
14231442
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1435,7 +1454,7 @@ mod tests {
14351454
fn test_background_event_handling() {
14361455
let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
14371456
let channel_value = 100000;
1438-
let data_dir = nodes[0].persister.get_data_dir();
1457+
let data_dir = nodes[0].kv_store.get_data_dir();
14391458
let persister = Arc::new(Persister::new(data_dir.clone()));
14401459

14411460
// Set up a background event handler for FundingGenerationReady events.
@@ -1508,7 +1527,7 @@ mod tests {
15081527
#[test]
15091528
fn test_scorer_persistence() {
15101529
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
1511-
let data_dir = nodes[0].persister.get_data_dir();
1530+
let data_dir = nodes[0].kv_store.get_data_dir();
15121531
let persister = Arc::new(Persister::new(data_dir));
15131532
let event_handler = |_: _| {};
15141533
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1580,7 +1599,7 @@ mod tests {
15801599
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
15811600

15821601
let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
1583-
let data_dir = nodes[0].persister.get_data_dir();
1602+
let data_dir = nodes[0].kv_store.get_data_dir();
15841603
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
15851604

15861605
let event_handler = |_: _| {};
@@ -1599,7 +1618,7 @@ mod tests {
15991618
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
16001619

16011620
let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
1602-
let data_dir = nodes[0].persister.get_data_dir();
1621+
let data_dir = nodes[0].kv_store.get_data_dir();
16031622
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
16041623

16051624
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
@@ -1739,7 +1758,7 @@ mod tests {
17391758
};
17401759

17411760
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
1742-
let data_dir = nodes[0].persister.get_data_dir();
1761+
let data_dir = nodes[0].kv_store.get_data_dir();
17431762
let persister = Arc::new(Persister::new(data_dir));
17441763
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
17451764

@@ -1772,7 +1791,7 @@ mod tests {
17721791
};
17731792

17741793
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
1775-
let data_dir = nodes[0].persister.get_data_dir();
1794+
let data_dir = nodes[0].kv_store.get_data_dir();
17761795
let persister = Arc::new(Persister::new(data_dir));
17771796

17781797
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());

0 commit comments

Comments
 (0)