Skip to content

Commit 4314915

Browse files
implement Persist and Persister with generic KVStorePersister trait
1 parent 711bcef commit 4314915

File tree

5 files changed

+136
-151
lines changed

5 files changed

+136
-151
lines changed

lightning-background-processor/src/lib.rs

+24-38
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescr
2020
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
2121
use lightning::util::events::{Event, EventHandler, EventsProvider};
2222
use lightning::util::logger::Logger;
23+
use lightning::util::persist::Persister;
2324
use std::sync::Arc;
2425
use std::sync::atomic::{AtomicBool, Ordering};
2526
use std::thread;
@@ -80,22 +81,7 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
8081
#[cfg(test)]
8182
const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
8283

83-
/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
84-
pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
85-
where
86-
M::Target: 'static + chain::Watch<Signer>,
87-
T::Target: 'static + BroadcasterInterface,
88-
K::Target: 'static + KeysInterface<Signer = Signer>,
89-
F::Target: 'static + FeeEstimator,
90-
L::Target: 'static + Logger,
91-
{
92-
/// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
93-
/// (which will cause the [`BackgroundProcessor`] which called this method to exit).
94-
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>;
95-
96-
/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
97-
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
98-
}
84+
9985

10086
/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
10187
struct DecoratingEventHandler<
@@ -133,6 +119,7 @@ impl BackgroundProcessor {
133119
/// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
134120
/// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
135121
/// either [`join`] or [`stop`].
122+
///
136123
///
137124
/// # Data Persistence
138125
///
@@ -142,7 +129,7 @@ impl BackgroundProcessor {
142129
/// provided implementation.
143130
///
144131
/// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
145-
/// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_network_graph`]
132+
/// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_graph`]
146133
/// for Rust-Lightning's provided implementation.
147134
///
148135
/// Typically, users should either implement [`Persister::persist_manager`] to never return an
@@ -161,8 +148,8 @@ impl BackgroundProcessor {
161148
/// [`stop`]: Self::stop
162149
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
163150
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
164-
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
165-
/// [`FilesystemPersister::persist_network_graph`]: lightning_persister::FilesystemPersister::persist_network_graph
151+
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister#impl-Persister
152+
/// [`FilesystemPersister::persist_graph`]: lightning_persister::FilesystemPersister#impl-Persister
166153
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
167154
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
168155
pub fn start<
@@ -365,10 +352,11 @@ mod tests {
365352
use lightning::util::logger::Logger;
366353
use lightning::util::ser::Writeable;
367354
use lightning::util::test_utils;
355+
use lightning::util::persist::KVStorePersister;
368356
use lightning_invoice::payment::{InvoicePayer, RetryAttempts};
369357
use lightning_invoice::utils::DefaultRouter;
370-
use lightning_persister::FilesystemPersister;
371-
use std::fs;
358+
use lightning_persister::{FilesystemPersister};
359+
use std::fs::{self, File};
372360
use std::ops::Deref;
373361
use std::path::PathBuf;
374362
use std::sync::{Arc, Mutex};
@@ -414,7 +402,7 @@ mod tests {
414402
struct Persister {
415403
data_dir: String,
416404
graph_error: Option<(std::io::ErrorKind, &'static str)>,
417-
manager_error: Option<(std::io::ErrorKind, &'static str)>
405+
manager_error: Option<(std::io::ErrorKind, &'static str)>,
418406
}
419407

420408
impl Persister {
@@ -431,25 +419,23 @@ mod tests {
431419
}
432420
}
433421

434-
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for Persister where
435-
M::Target: 'static + chain::Watch<Signer>,
436-
T::Target: 'static + BroadcasterInterface,
437-
K::Target: 'static + KeysInterface<Signer = Signer>,
438-
F::Target: 'static + FeeEstimator,
439-
L::Target: 'static + Logger,
440-
{
441-
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
442-
match self.manager_error {
443-
None => FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager),
444-
Some((error, message)) => Err(std::io::Error::new(error, message)),
422+
impl KVStorePersister for Persister {
423+
fn persist<W: Writeable>(&self, key: String, object: &W) -> std::io::Result<()> {
424+
if key == self.get_channel_manager_key() {
425+
return match self.manager_error {
426+
None => Ok(()),
427+
Some((error, message)) => Err(std::io::Error::new(error, message))
428+
}
445429
}
446-
}
447430

448-
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
449-
match self.graph_error {
450-
None => FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph),
451-
Some((error, message)) => Err(std::io::Error::new(error, message)),
431+
if key == self.get_network_graph_key() {
432+
return match self.graph_error {
433+
None => Ok(()),
434+
Some((error, message)) => Err(std::io::Error::new(error, message))
435+
}
452436
}
437+
438+
Ok(())
453439
}
454440
}
455441

lightning-persister/src/lib.rs

+11-82
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,15 @@ extern crate bitcoin;
1515
extern crate libc;
1616

1717
use bitcoin::hash_types::{BlockHash, Txid};
18-
use bitcoin::hashes::hex::{FromHex, ToHex};
19-
use lightning::routing::network_graph::NetworkGraph;
20-
use crate::util::DiskWriteable;
21-
use lightning::chain;
22-
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
23-
use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
24-
use lightning::chain::chainmonitor;
18+
use bitcoin::hashes::hex::FromHex;
19+
use lightning::chain::channelmonitor::ChannelMonitor;
2520
use lightning::chain::keysinterface::{Sign, KeysInterface};
26-
use lightning::chain::transaction::OutPoint;
27-
use lightning::ln::channelmanager::ChannelManager;
28-
use lightning::util::logger::Logger;
2921
use lightning::util::ser::{ReadableArgs, Writeable};
22+
use lightning::util::persist::KVStorePersister;
3023
use std::fs;
31-
use std::io::{Cursor, Error};
24+
use std::io::Cursor;
3225
use std::ops::Deref;
33-
use std::path::{Path, PathBuf};
26+
use std::path::{Path, PathBuf, MAIN_SEPARATOR};
3427

3528
/// FilesystemPersister persists channel data on disk, where each channel's
3629
/// data is stored in a file named after its funding outpoint.
@@ -48,31 +41,6 @@ pub struct FilesystemPersister {
4841
path_to_channel_data: String,
4942
}
5043

51-
impl<Signer: Sign> DiskWriteable for ChannelMonitor<Signer> {
52-
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), Error> {
53-
self.write(writer)
54-
}
55-
}
56-
57-
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> DiskWriteable for ChannelManager<Signer, M, T, K, F, L>
58-
where
59-
M::Target: chain::Watch<Signer>,
60-
T::Target: BroadcasterInterface,
61-
K::Target: KeysInterface<Signer=Signer>,
62-
F::Target: FeeEstimator,
63-
L::Target: Logger,
64-
{
65-
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> {
66-
self.write(writer)
67-
}
68-
}
69-
70-
impl DiskWriteable for NetworkGraph {
71-
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> {
72-
self.write(writer)
73-
}
74-
}
75-
7644
impl FilesystemPersister {
7745
/// Initialize a new FilesystemPersister and set the path to the individual channels'
7846
/// files.
@@ -87,34 +55,8 @@ impl FilesystemPersister {
8755
self.path_to_channel_data.clone()
8856
}
8957

90-
pub(crate) fn path_to_monitor_data(&self) -> PathBuf {
91-
let mut path = PathBuf::from(self.path_to_channel_data.clone());
92-
path.push("monitors");
93-
path
94-
}
95-
96-
/// Writes the provided `ChannelManager` to the path provided at `FilesystemPersister`
97-
/// initialization, within a file called "manager".
98-
pub fn persist_manager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>(
99-
data_dir: String,
100-
manager: &ChannelManager<Signer, M, T, K, F, L>
101-
) -> Result<(), std::io::Error>
102-
where
103-
M::Target: chain::Watch<Signer>,
104-
T::Target: BroadcasterInterface,
105-
K::Target: KeysInterface<Signer=Signer>,
106-
F::Target: FeeEstimator,
107-
L::Target: Logger,
108-
{
109-
let path = PathBuf::from(data_dir);
110-
util::write_to_file(path, "manager".to_string(), manager)
111-
}
112-
113-
/// Write the provided `NetworkGraph` to the path provided at `FilesystemPersister`
114-
/// initialization, within a file called "network_graph"
115-
pub fn persist_network_graph(data_dir: String, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
116-
let path = PathBuf::from(data_dir);
117-
util::write_to_file(path, "network_graph".to_string(), network_graph)
58+
pub(crate) fn path_to_monitor_data(&self) -> String {
59+
format!("{}{}monitors", self.path_to_channel_data, MAIN_SEPARATOR)
11860
}
11961

12062
/// Read `ChannelMonitor`s from disk.
@@ -124,7 +66,7 @@ impl FilesystemPersister {
12466
where K::Target: KeysInterface<Signer=Signer> + Sized,
12567
{
12668
let path = self.path_to_monitor_data();
127-
if !Path::new(&path).exists() {
69+
if !Path::new(&PathBuf::from(&path)).exists() {
12870
return Ok(Vec::new());
12971
}
13072
let mut res = Vec::new();
@@ -180,22 +122,9 @@ impl FilesystemPersister {
180122
}
181123
}
182124

183-
impl<ChannelSigner: Sign> chainmonitor::Persist<ChannelSigner> for FilesystemPersister {
184-
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
185-
// down once these start returning failure.
186-
// A PermanentFailure implies we need to shut down since we're force-closing channels without
187-
// even broadcasting!
188-
189-
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
190-
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
191-
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
192-
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
193-
}
194-
195-
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option<ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
196-
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
197-
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
198-
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
125+
impl KVStorePersister for FilesystemPersister {
126+
fn persist<W: Writeable>(&self, key: String, object: &W) -> std::io::Result<()> {
127+
util::write_to_file(format!("{}{}{}", self.path_to_channel_data, MAIN_SEPARATOR, key), object)
199128
}
200129
}
201130

lightning-persister/src/util.rs

+32-31
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,14 @@ use std::path::{Path, PathBuf};
77
#[cfg(not(target_os = "windows"))]
88
use std::os::unix::io::AsRawFd;
99

10+
use lightning::util::ser::Writeable;
11+
1012
#[cfg(target_os = "windows")]
1113
use {
1214
std::ffi::OsStr,
1315
std::os::windows::ffi::OsStrExt
1416
};
1517

16-
pub(crate) trait DiskWriteable {
17-
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error>;
18-
}
19-
20-
pub(crate) fn get_full_filepath(mut filepath: PathBuf, filename: String) -> String {
21-
filepath.push(filename);
22-
filepath.to_str().unwrap().to_string()
23-
}
24-
2518
#[cfg(target_os = "windows")]
2619
macro_rules! call {
2720
($e: expr) => (
@@ -39,21 +32,25 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<winapi::shared::ntdef::W
3932
}
4033

4134
#[allow(bare_trait_objects)]
42-
pub(crate) fn write_to_file<D: DiskWriteable>(path: PathBuf, filename: String, data: &D) -> std::io::Result<()> {
43-
fs::create_dir_all(path.clone())?;
35+
pub(crate) fn write_to_file<W: Writeable>(filename_with_path: String, data: &W) -> std::io::Result<()> {
36+
let mut tmp_filename = filename_with_path.clone();
37+
tmp_filename.push_str(".tmp");
38+
39+
let full_path = PathBuf::from(&filename_with_path);
40+
let path = full_path.parent().unwrap();
41+
fs::create_dir_all(path)?;
4442
// Do a crazy dance with lots of fsync()s to be overly cautious here...
4543
// We never want to end up in a state where we've lost the old data, or end up using the
4644
// old data on power loss after we've returned.
4745
// The way to atomically write a file on Unix platforms is:
4846
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
49-
let filename_with_path = get_full_filepath(path, filename);
50-
let tmp_filename = format!("{}.tmp", filename_with_path.clone());
47+
5148

5249
{
5350
// Note that going by rust-lang/rust@d602a6b, on MacOS it is only safe to use
5451
// rust stdlib 1.36 or higher.
5552
let mut f = fs::File::create(&tmp_filename)?;
56-
data.write_to_file(&mut f)?;
53+
data.write(&mut f)?;
5754
f.sync_all()?;
5855
}
5956
// Fsync the parent directory on Unix.
@@ -87,15 +84,17 @@ pub(crate) fn write_to_file<D: DiskWriteable>(path: PathBuf, filename: String, d
8784

8885
#[cfg(test)]
8986
mod tests {
90-
use super::{DiskWriteable, get_full_filepath, write_to_file};
87+
use lightning::util::ser::{Writer, Writeable};
88+
89+
use super::{write_to_file};
9190
use std::fs;
9291
use std::io;
9392
use std::io::Write;
94-
use std::path::PathBuf;
93+
use std::path::{PathBuf, MAIN_SEPARATOR};
9594

9695
struct TestWriteable{}
97-
impl DiskWriteable for TestWriteable {
98-
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), io::Error> {
96+
impl Writeable for TestWriteable {
97+
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), std::io::Error> {
9998
writer.write_all(&[42; 1])
10099
}
101100
}
@@ -113,7 +112,8 @@ mod tests {
113112
let mut perms = fs::metadata(path.to_string()).unwrap().permissions();
114113
perms.set_readonly(true);
115114
fs::set_permissions(path.to_string(), perms).unwrap();
116-
match write_to_file(PathBuf::from(path.to_string()), filename, &test_writeable) {
115+
116+
match write_to_file(format!("{}{}{}", path, MAIN_SEPARATOR, filename), &test_writeable) {
117117
Err(e) => assert_eq!(e.kind(), io::ErrorKind::PermissionDenied),
118118
_ => panic!("Unexpected error message")
119119
}
@@ -131,10 +131,11 @@ mod tests {
131131
fn test_rename_failure() {
132132
let test_writeable = TestWriteable{};
133133
let filename = "test_rename_failure_filename";
134-
let path = PathBuf::from("test_rename_failure_dir");
134+
let path = "test_rename_failure_dir";
135+
let full_file_path = format!("{}{}{}", path, MAIN_SEPARATOR, filename);
135136
// Create the channel data file and make it a directory.
136-
fs::create_dir_all(get_full_filepath(path.clone(), filename.to_string())).unwrap();
137-
match write_to_file(path.clone(), filename.to_string(), &test_writeable) {
137+
fs::create_dir_all(full_file_path.clone()).unwrap();
138+
match write_to_file(full_file_path, &test_writeable) {
138139
Err(e) => assert_eq!(e.raw_os_error(), Some(libc::EISDIR)),
139140
_ => panic!("Unexpected Ok(())")
140141
}
@@ -144,16 +145,17 @@ mod tests {
144145
#[test]
145146
fn test_diskwriteable_failure() {
146147
struct FailingWriteable {}
147-
impl DiskWriteable for FailingWriteable {
148-
fn write_to_file(&self, _writer: &mut fs::File) -> Result<(), std::io::Error> {
148+
impl Writeable for FailingWriteable {
149+
fn write<W: Writer>(&self, _writer: &mut W) -> Result<(), std::io::Error> {
149150
Err(std::io::Error::new(std::io::ErrorKind::Other, "expected failure"))
150151
}
151152
}
152153

153154
let filename = "test_diskwriteable_failure";
154-
let path = PathBuf::from("test_diskwriteable_failure_dir");
155+
let path = "test_diskwriteable_failure_dir";
155156
let test_writeable = FailingWriteable{};
156-
match write_to_file(path.clone(), filename.to_string(), &test_writeable) {
157+
let full_path = format!("{}{}{}", path, MAIN_SEPARATOR, filename);
158+
match write_to_file(full_path.clone(), &test_writeable) {
157159
Err(e) => {
158160
assert_eq!(e.kind(), std::io::ErrorKind::Other);
159161
assert_eq!(e.get_ref().unwrap().to_string(), "expected failure");
@@ -170,12 +172,11 @@ mod tests {
170172
fn test_tmp_file_creation_failure() {
171173
let test_writeable = TestWriteable{};
172174
let filename = "test_tmp_file_creation_failure_filename".to_string();
173-
let path = PathBuf::from("test_tmp_file_creation_failure_dir");
174-
175-
// Create the tmp file and make it a directory.
176-
let tmp_path = get_full_filepath(path.clone(), format!("{}.tmp", filename.clone()));
175+
let path = "test_tmp_file_creation_failure_dir";
176+
let tmp_path = format!("{}{}{}.tmp", path, MAIN_SEPARATOR, filename.clone());
177+
let full_filepath = format!("{}{}{}", path, MAIN_SEPARATOR, filename);
177178
fs::create_dir_all(tmp_path).unwrap();
178-
match write_to_file(path, filename, &test_writeable) {
179+
match write_to_file(full_filepath, &test_writeable) {
179180
Err(e) => {
180181
#[cfg(not(target_os = "windows"))]
181182
assert_eq!(e.raw_os_error(), Some(libc::EISDIR));

0 commit comments

Comments
 (0)