Skip to content

Commit 7e9d008

Browse files
implement Persist and Persister with generic KVStorePersister trait
1 parent 83595db commit 7e9d008

File tree

6 files changed

+168
-176
lines changed

6 files changed

+168
-176
lines changed

lightning-background-processor/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ rustdoc-args = ["--cfg", "docsrs"]
1616
[dependencies]
1717
bitcoin = "0.27"
1818
lightning = { version = "0.0.106", path = "../lightning", features = ["std"] }
19-
lightning-persister = { version = "0.0.106", path = "../lightning-persister" }
2019

2120
[dev-dependencies]
2221
lightning = { version = "0.0.106", path = "../lightning", features = ["_test_utils"] }
2322
lightning-invoice = { version = "0.14.0", path = "../lightning-invoice" }
23+
lightning-persister = { version = "0.0.106", path = "../lightning-persister" }

lightning-background-processor/src/lib.rs

+34-48
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescr
2020
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
2121
use lightning::util::events::{Event, EventHandler, EventsProvider};
2222
use lightning::util::logger::Logger;
23+
use lightning::util::persist::Persister;
2324
use std::sync::Arc;
2425
use std::sync::atomic::{AtomicBool, Ordering};
2526
use std::thread;
@@ -80,22 +81,6 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
8081
#[cfg(test)]
8182
const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
8283

83-
/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
84-
pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
85-
where
86-
M::Target: 'static + chain::Watch<Signer>,
87-
T::Target: 'static + BroadcasterInterface,
88-
K::Target: 'static + KeysInterface<Signer = Signer>,
89-
F::Target: 'static + FeeEstimator,
90-
L::Target: 'static + Logger,
91-
{
92-
/// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
93-
/// (which will cause the [`BackgroundProcessor`] which called this method to exit).
94-
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>;
95-
96-
/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
97-
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
98-
}
9984

10085
/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
10186
struct DecoratingEventHandler<
@@ -138,12 +123,12 @@ impl BackgroundProcessor {
138123
///
139124
/// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
140125
/// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
141-
/// [`ChannelManager`]. See [`FilesystemPersister::persist_manager`] for Rust-Lightning's
126+
/// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
142127
/// provided implementation.
143128
///
144129
/// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
145-
/// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_network_graph`]
146-
/// for Rust-Lightning's provided implementation.
130+
/// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See the `lightning-persister` crate
131+
/// for LDK's provided implementation.
147132
///
148133
/// Typically, users should either implement [`Persister::persist_manager`] to never return an
149134
/// error or call [`join`] and handle any error that may arise. For the latter case,
@@ -161,8 +146,8 @@ impl BackgroundProcessor {
161146
/// [`stop`]: Self::stop
162147
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
163148
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
164-
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
165-
/// [`FilesystemPersister::persist_network_graph`]: lightning_persister::FilesystemPersister::persist_network_graph
149+
/// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
150+
/// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
166151
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
167152
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
168153
pub fn start<
@@ -180,7 +165,7 @@ impl BackgroundProcessor {
180165
CMH: 'static + Deref + Send + Sync,
181166
RMH: 'static + Deref + Send + Sync,
182167
EH: 'static + EventHandler + Send,
183-
PS: 'static + Send + Persister<Signer, CW, T, K, F, L>,
168+
PS: 'static + Deref + Send,
184169
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
185170
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
186171
NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
@@ -202,6 +187,7 @@ impl BackgroundProcessor {
202187
CMH::Target: 'static + ChannelMessageHandler,
203188
RMH::Target: 'static + RoutingMessageHandler,
204189
UMH::Target: 'static + CustomMessageHandler,
190+
PS::Target: 'static + Persister<Signer, CW, T, K, F, L>
205191
{
206192
let stop_thread = Arc::new(AtomicBool::new(false));
207193
let stop_thread_clone = stop_thread.clone();
@@ -365,10 +351,11 @@ mod tests {
365351
use lightning::util::logger::Logger;
366352
use lightning::util::ser::Writeable;
367353
use lightning::util::test_utils;
354+
use lightning::util::persist::KVStorePersister;
368355
use lightning_invoice::payment::{InvoicePayer, RetryAttempts};
369356
use lightning_invoice::utils::DefaultRouter;
370357
use lightning_persister::FilesystemPersister;
371-
use std::fs;
358+
use std::fs::{self, File};
372359
use std::ops::Deref;
373360
use std::path::PathBuf;
374361
use std::sync::{Arc, Mutex};
@@ -414,12 +401,14 @@ mod tests {
414401
struct Persister {
415402
data_dir: String,
416403
graph_error: Option<(std::io::ErrorKind, &'static str)>,
417-
manager_error: Option<(std::io::ErrorKind, &'static str)>
404+
manager_error: Option<(std::io::ErrorKind, &'static str)>,
405+
filesystem_persister: FilesystemPersister,
418406
}
419407

420408
impl Persister {
421409
fn new(data_dir: String) -> Self {
422-
Self { data_dir, graph_error: None, manager_error: None }
410+
let filesystem_persister = FilesystemPersister::new(data_dir.clone());
411+
Self { data_dir, graph_error: None, manager_error: None, filesystem_persister }
423412
}
424413

425414
fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
@@ -431,25 +420,21 @@ mod tests {
431420
}
432421
}
433422

434-
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for Persister where
435-
M::Target: 'static + chain::Watch<Signer>,
436-
T::Target: 'static + BroadcasterInterface,
437-
K::Target: 'static + KeysInterface<Signer = Signer>,
438-
F::Target: 'static + FeeEstimator,
439-
L::Target: 'static + Logger,
440-
{
441-
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
442-
match self.manager_error {
443-
None => FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager),
444-
Some((error, message)) => Err(std::io::Error::new(error, message)),
423+
impl KVStorePersister for Persister {
424+
fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
425+
if key == "manager" {
426+
if let Some((error, message)) = self.manager_error {
427+
return Err(std::io::Error::new(error, message))
428+
}
445429
}
446-
}
447430

448-
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
449-
match self.graph_error {
450-
None => FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph),
451-
Some((error, message)) => Err(std::io::Error::new(error, message)),
431+
if key == "network_graph" {
432+
if let Some((error, message)) = self.graph_error {
433+
return Err(std::io::Error::new(error, message))
434+
}
452435
}
436+
437+
self.filesystem_persister.persist(key, object)
453438
}
454439
}
455440

@@ -576,7 +561,7 @@ mod tests {
576561

577562
// Initiate the background processors to watch each node.
578563
let data_dir = nodes[0].persister.get_data_dir();
579-
let persister = Persister::new(data_dir);
564+
let persister = Arc::new(Persister::new(data_dir));
580565
let event_handler = |_: &_| {};
581566
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
582567

@@ -637,7 +622,7 @@ mod tests {
637622
// `FRESHNESS_TIMER`.
638623
let nodes = create_nodes(1, "test_timer_tick_called".to_string());
639624
let data_dir = nodes[0].persister.get_data_dir();
640-
let persister = Persister::new(data_dir);
625+
let persister = Arc::new(Persister::new(data_dir));
641626
let event_handler = |_: &_| {};
642627
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
643628
loop {
@@ -660,7 +645,7 @@ mod tests {
660645
open_channel!(nodes[0], nodes[1], 100000);
661646

662647
let data_dir = nodes[0].persister.get_data_dir();
663-
let persister = Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test");
648+
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
664649
let event_handler = |_: &_| {};
665650
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
666651
match bg_processor.join() {
@@ -677,7 +662,7 @@ mod tests {
677662
// Test that if we encounter an error during network graph persistence, an error gets returned.
678663
let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
679664
let data_dir = nodes[0].persister.get_data_dir();
680-
let persister = Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test");
665+
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
681666
let event_handler = |_: &_| {};
682667
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
683668

@@ -695,7 +680,7 @@ mod tests {
695680
let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
696681
let channel_value = 100000;
697682
let data_dir = nodes[0].persister.get_data_dir();
698-
let persister = Persister::new(data_dir.clone());
683+
let persister = Arc::new(Persister::new(data_dir.clone()));
699684

700685
// Set up a background event handler for FundingGenerationReady events.
701686
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
@@ -726,7 +711,8 @@ mod tests {
726711
// Set up a background event handler for SpendableOutputs events.
727712
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
728713
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
729-
let bg_processor = BackgroundProcessor::start(Persister::new(data_dir), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
714+
let persister = Arc::new(Persister::new(data_dir));
715+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
730716

731717
// Force close the channel and check that the SpendableOutputs event was handled.
732718
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
@@ -752,7 +738,7 @@ mod tests {
752738

753739
// Initiate the background processors to watch each node.
754740
let data_dir = nodes[0].persister.get_data_dir();
755-
let persister = Persister::new(data_dir);
741+
let persister = Arc::new(Persister::new(data_dir));
756742
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
757743
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
758744
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));

lightning-persister/src/lib.rs

+12-84
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,15 @@ extern crate bitcoin;
1515
extern crate libc;
1616

1717
use bitcoin::hash_types::{BlockHash, Txid};
18-
use bitcoin::hashes::hex::{FromHex, ToHex};
19-
use lightning::routing::network_graph::NetworkGraph;
20-
use crate::util::DiskWriteable;
21-
use lightning::chain;
22-
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
23-
use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
24-
use lightning::chain::chainmonitor;
18+
use bitcoin::hashes::hex::FromHex;
19+
use lightning::chain::channelmonitor::ChannelMonitor;
2520
use lightning::chain::keysinterface::{Sign, KeysInterface};
26-
use lightning::chain::transaction::OutPoint;
27-
use lightning::ln::channelmanager::ChannelManager;
28-
use lightning::util::logger::Logger;
2921
use lightning::util::ser::{ReadableArgs, Writeable};
22+
use lightning::util::persist::KVStorePersister;
3023
use std::fs;
31-
use std::io::{Cursor, Error, Write};
24+
use std::io::Cursor;
3225
use std::ops::Deref;
33-
use std::path::{Path, PathBuf};
26+
use std::path::{Path, PathBuf, MAIN_SEPARATOR};
3427

3528
/// FilesystemPersister persists channel data on disk, where each channel's
3629
/// data is stored in a file named after its funding outpoint.
@@ -48,31 +41,6 @@ pub struct FilesystemPersister {
4841
path_to_channel_data: String,
4942
}
5043

51-
impl<Signer: Sign> DiskWriteable for ChannelMonitor<Signer> {
52-
fn write_to_file<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
53-
self.write(writer)
54-
}
55-
}
56-
57-
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> DiskWriteable for ChannelManager<Signer, M, T, K, F, L>
58-
where
59-
M::Target: chain::Watch<Signer>,
60-
T::Target: BroadcasterInterface,
61-
K::Target: KeysInterface<Signer=Signer>,
62-
F::Target: FeeEstimator,
63-
L::Target: Logger,
64-
{
65-
fn write_to_file<W: Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
66-
self.write(writer)
67-
}
68-
}
69-
70-
impl DiskWriteable for NetworkGraph {
71-
fn write_to_file<W: Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
72-
self.write(writer)
73-
}
74-
}
75-
7644
impl FilesystemPersister {
7745
/// Initialize a new FilesystemPersister and set the path to the individual channels'
7846
/// files.
@@ -87,43 +55,14 @@ impl FilesystemPersister {
8755
self.path_to_channel_data.clone()
8856
}
8957

90-
pub(crate) fn path_to_monitor_data(&self) -> PathBuf {
91-
let mut path = PathBuf::from(self.path_to_channel_data.clone());
92-
path.push("monitors");
93-
path
94-
}
95-
96-
/// Writes the provided `ChannelManager` to the path provided at `FilesystemPersister`
97-
/// initialization, within a file called "manager".
98-
pub fn persist_manager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>(
99-
data_dir: String,
100-
manager: &ChannelManager<Signer, M, T, K, F, L>
101-
) -> Result<(), std::io::Error>
102-
where
103-
M::Target: chain::Watch<Signer>,
104-
T::Target: BroadcasterInterface,
105-
K::Target: KeysInterface<Signer=Signer>,
106-
F::Target: FeeEstimator,
107-
L::Target: Logger,
108-
{
109-
let path = PathBuf::from(data_dir);
110-
util::write_to_file(path, "manager".to_string(), manager)
111-
}
112-
113-
/// Write the provided `NetworkGraph` to the path provided at `FilesystemPersister`
114-
/// initialization, within a file called "network_graph"
115-
pub fn persist_network_graph(data_dir: String, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
116-
let path = PathBuf::from(data_dir);
117-
util::write_to_file(path, "network_graph".to_string(), network_graph)
118-
}
119-
12058
/// Read `ChannelMonitor`s from disk.
12159
pub fn read_channelmonitors<Signer: Sign, K: Deref> (
12260
&self, keys_manager: K
12361
) -> Result<Vec<(BlockHash, ChannelMonitor<Signer>)>, std::io::Error>
12462
where K::Target: KeysInterface<Signer=Signer> + Sized,
12563
{
126-
let path = self.path_to_monitor_data();
64+
let mut path = PathBuf::from(&self.path_to_channel_data);
65+
path.push("monitors");
12766
if !Path::new(&path).exists() {
12867
return Ok(Vec::new());
12968
}
@@ -180,22 +119,11 @@ impl FilesystemPersister {
180119
}
181120
}
182121

183-
impl<ChannelSigner: Sign> chainmonitor::Persist<ChannelSigner> for FilesystemPersister {
184-
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
185-
// down once these start returning failure.
186-
// A PermanentFailure implies we need to shut down since we're force-closing channels without
187-
// even broadcasting!
188-
189-
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
190-
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
191-
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
192-
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
193-
}
194-
195-
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option<ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
196-
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
197-
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
198-
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
122+
impl KVStorePersister for FilesystemPersister {
123+
fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
124+
let mut dest_file = PathBuf::from(self.path_to_channel_data.clone());
125+
dest_file.push(key);
126+
util::write_to_file(dest_file, object)
199127
}
200128
}
201129

0 commit comments

Comments
 (0)