Skip to content

Commit c3729d7

Browse files
Add a sample module FilesystemPersister.
This adds a new lightning-data-persister crate, that uses the newly exposed lightning crate's test utilities. Notably, this crate is pretty small right now. However, due to future plans to add more data persistence (e.g. persisting the ChannelManager, etc) and a desire to avoid pulling in filesystem usage into the core lightning package, it is best for it to be separated out.
1 parent 5e3aadc commit c3729d7

File tree

3 files changed

+288
-0
lines changed

3 files changed

+288
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
members = [
44
"lightning",
55
"lightning-net-tokio",
6+
"lightning-data-persister",
67
]
78

89
# Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it

lightning-data-persister/Cargo.toml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[package]
2+
name = "lightning-data-persister"
3+
version = "0.0.1"
4+
authors = ["Valentine Wallace", "Matt Corallo"]
5+
license = "Apache-2.0"
6+
description = """
7+
Utilities to manage channel data persistence and retrieval.
8+
"""
9+
10+
[dependencies]
11+
bitcoin = "0.24"
12+
lightning = { version = "0.0.11", path = "../lightning" }
13+
libc = "0.2"
14+
15+
[dev-dependencies.bitcoin]
16+
version = "0.24"
17+
features = ["bitcoinconsensus"]
18+
19+
[dev-dependencies]
20+
lightning = { version = "0.0.11", path = "../lightning", features = ["_test_utils"] }

lightning-data-persister/src/lib.rs

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
extern crate lightning;
2+
extern crate bitcoin;
3+
extern crate libc;
4+
5+
use bitcoin::hashes::hex::ToHex;
6+
use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr};
7+
use lightning::chain::keysinterface::ChannelKeys;
8+
use lightning::chain::transaction::OutPoint;
9+
use lightning::ln::data_persister::ChannelDataPersister;
10+
use lightning::util::ser::{Writeable, Readable};
11+
use std::fs;
12+
use std::io::Error;
13+
use std::marker::PhantomData;
14+
use std::path::Path;
15+
16+
#[cfg(test)]
17+
use bitcoin::{BlockHash, Txid};
18+
use bitcoin::hashes::hex::FromHex;
19+
use std::collections::HashMap;
20+
use std::io::Cursor;
21+
22+
#[cfg(not(target_os = "windows"))]
23+
use std::os::unix::io::AsRawFd;
24+
25+
/// FilesystemPersister can persist channel data on disk on Linux machines, where
26+
/// each channel's data is stored in a file named after its funding outpoint.
27+
///
28+
/// Warning: this module does the best it can with calls to persist data, but it
29+
/// can only guarantee that the data is passed to the drive. It is up to the
30+
/// drive manufacturers to do the actual persistence properly, which they often
31+
/// don't (especially on consumer-grade hardware). Therefore, it is up to the
32+
/// user to validate their entire storage stack, to ensure the writes are
33+
/// persistent.
34+
/// Corollary: especially when dealing with larger amounts of money, it is best
35+
/// practice to have multiple channel data backups and not rely only on the
36+
/// FilesystemPersister.
37+
pub struct FilesystemPersister<ChanSigner: ChannelKeys + Readable + Writeable> {
38+
path_to_channel_data: String,
39+
phantom: PhantomData<ChanSigner>, // TODO: is there a way around this?
40+
}
41+
42+
trait DiskWriteable {
43+
fn write(&self, writer: &mut fs::File) -> Result<(), Error>;
44+
}
45+
46+
impl<ChanSigner: ChannelKeys + Writeable> DiskWriteable for ChannelMonitor<ChanSigner> {
47+
fn write(&self, writer: &mut fs::File) -> Result<(), Error> {
48+
self.write_for_disk(writer)
49+
}
50+
}
51+
52+
impl<ChanSigner: ChannelKeys + Readable + Writeable> FilesystemPersister<ChanSigner> {
53+
/// Initialize a new FilesystemPersister and set the path to the individual channels'
54+
/// files.
55+
pub fn new(path_to_channel_data: String) -> Self {
56+
return Self {
57+
path_to_channel_data,
58+
phantom: PhantomData,
59+
}
60+
}
61+
62+
fn get_full_filepath(&self, funding_txo: OutPoint) -> String {
63+
format!("{}/{}_{}", self.path_to_channel_data, funding_txo.txid.to_hex(), funding_txo.index)
64+
}
65+
66+
// Utility to write a file to disk.
67+
// Note: we may eventually want to change this model to a queue-based system, such that
68+
// writes are queued to a separate writer thread. This would improve performance
69+
// since syncing to disk can sometimes take 100s of milliseconds.
70+
fn write_channel_data(&self, funding_txo: OutPoint, monitor: &dyn DiskWriteable) -> std::io::Result<()> {
71+
fs::create_dir_all(&self.path_to_channel_data)?;
72+
// Do a crazy dance with lots of fsync()s to be overly cautious here...
73+
// We never want to end up in a state where we've lost the old data, or end up using the
74+
// old data on power loss after we've returned.
75+
// The way to atomically write a file on Unix platforms is:
76+
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
77+
let filename = self.get_full_filepath(funding_txo);
78+
let tmp_filename = filename.clone() + ".tmp";
79+
80+
{
81+
// Note that going by rust-lang/rust@d602a6b, on MacOS it is only safe to use
82+
// rust stdlib 1.36 or higher.
83+
let mut f = fs::File::create(&tmp_filename)?;
84+
monitor.write(&mut f)?;
85+
f.sync_all()?;
86+
}
87+
fs::rename(&tmp_filename, &filename)?;
88+
// Fsync the parent directory on Unix.
89+
#[cfg(not(target_os = "windows"))]
90+
{
91+
let path_str = filename.clone();
92+
let path = Path::new(&path_str).parent().unwrap();
93+
let dir_file = fs::OpenOptions::new().read(true).open(path)?;
94+
unsafe { libc::fsync(dir_file.as_raw_fd()); }
95+
}
96+
Ok(())
97+
}
98+
99+
#[cfg(test)]
100+
fn load_channel_data(&self) -> Result<HashMap<OutPoint, ChannelMonitor<ChanSigner>>, ChannelMonitorUpdateErr> {
101+
if let Err(_) = fs::create_dir_all(&self.path_to_channel_data) {
102+
return Err(ChannelMonitorUpdateErr::TemporaryFailure);
103+
}
104+
let mut res = HashMap::new();
105+
for file_option in fs::read_dir(&self.path_to_channel_data).unwrap() {
106+
let mut loaded = false;
107+
let file = file_option.unwrap();
108+
if let Some(filename) = file.file_name().to_str() {
109+
if filename.is_ascii() && filename.len() > 65 {
110+
if let Ok(txid) = Txid::from_hex(filename.split_at(64).0) {
111+
if let Ok(index) = filename.split_at(65).1.split('.').next().unwrap().parse() {
112+
if let Ok(contents) = fs::read(&file.path()) {
113+
if let Ok((_, loaded_monitor)) = <(BlockHash, ChannelMonitor<ChanSigner>)>::read(&mut Cursor::new(&contents)) {
114+
res.insert(OutPoint { txid, index }, loaded_monitor);
115+
loaded = true;
116+
}
117+
}
118+
}
119+
}
120+
}
121+
}
122+
if !loaded {
123+
println!("WARNING: Failed to read one of the channel monitor storage files! Check perms!");
124+
}
125+
}
126+
Ok(res)
127+
}
128+
}
129+
130+
impl<ChanSigner: ChannelKeys + Readable + Writeable + Send + Sync> ChannelDataPersister for FilesystemPersister<ChanSigner> {
131+
type Keys = ChanSigner;
132+
133+
fn persist_channel_data(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<Self::Keys>) -> Result<(), ChannelMonitorUpdateErr> {
134+
match self.write_channel_data(funding_txo, monitor) {
135+
Ok(_) => Ok(()),
136+
Err(_) => Err(ChannelMonitorUpdateErr::TemporaryFailure)
137+
}
138+
}
139+
140+
fn update_channel_data(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr> {
141+
match self.write_channel_data(funding_txo, monitor) {
142+
Ok(_) => Ok(()),
143+
Err(_) => Err(ChannelMonitorUpdateErr::TemporaryFailure)
144+
}
145+
}
146+
147+
}
148+
149+
#[cfg(test)]
150+
mod tests {
151+
extern crate lightning;
152+
extern crate bitcoin;
153+
use crate::FilesystemPersister;
154+
use bitcoin::hashes::hex::FromHex;
155+
use bitcoin::blockdata::block::{Block, BlockHeader};
156+
use bitcoin::Txid;
157+
use DiskWriteable;
158+
use Error;
159+
use lightning::{check_closed_broadcast, check_added_monitors};
160+
use lightning::chain::transaction::OutPoint;
161+
use lightning::ln::features::InitFeatures;
162+
use lightning::ln::functional_test_utils::*;
163+
use lightning::ln::msgs::ErrorAction;
164+
use lightning::util::enforcing_trait_impls::EnforcingChannelKeys;
165+
use lightning::util::events::{MessageSendEventsProvider, MessageSendEvent};
166+
use lightning::util::ser::Writer;
167+
use lightning::util::test_utils;
168+
use std::fs;
169+
use std::io;
170+
171+
struct TestSerializable {}
172+
impl DiskWriteable for TestSerializable {
173+
fn write(&self, writer: &mut fs::File) -> Result<(), Error> {
174+
writer.write_all(&[42; 1])
175+
}
176+
}
177+
178+
#[test]
179+
fn test_filesystem_data_persister() {
180+
// Create the nodes, giving them FilesystemPersisters for data persisters.
181+
let data_persister_0: FilesystemPersister<EnforcingChannelKeys> = FilesystemPersister::new("persister0".to_string());
182+
let data_persister_1: FilesystemPersister<EnforcingChannelKeys> = FilesystemPersister::new("persister1".to_string());
183+
let chanmon_cfgs = create_chanmon_cfgs(2);
184+
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
185+
let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &data_persister_0);
186+
let chain_mon_1 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[1].chain_source), &chanmon_cfgs[1].tx_broadcaster, &chanmon_cfgs[1].logger, &chanmon_cfgs[1].fee_estimator, &data_persister_1);
187+
node_cfgs[0].chain_monitor = chain_mon_0;
188+
node_cfgs[1].chain_monitor = chain_mon_1;
189+
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
190+
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
191+
192+
// Check that the persisted channel data is empty before any channels are
193+
// open.
194+
let mut persisted_chan_data_0 = data_persister_0.load_channel_data().unwrap();
195+
assert_eq!(persisted_chan_data_0.keys().len(), 0);
196+
let mut persisted_chan_data_1 = data_persister_1.load_channel_data().unwrap();
197+
assert_eq!(persisted_chan_data_1.keys().len(), 0);
198+
199+
// Helper to make sure the channel is on the expected update ID.
200+
macro_rules! check_persisted_data {
201+
($expected_update_id: expr) => {
202+
persisted_chan_data_0 = data_persister_0.load_channel_data().unwrap();
203+
assert_eq!(persisted_chan_data_0.keys().len(), 1);
204+
for mon in persisted_chan_data_0.values() {
205+
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
206+
}
207+
persisted_chan_data_1 = data_persister_1.load_channel_data().unwrap();
208+
assert_eq!(persisted_chan_data_1.keys().len(), 1);
209+
for mon in persisted_chan_data_1.values() {
210+
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
211+
}
212+
}
213+
}
214+
215+
// Create some initial channel and check that a channel was persisted.
216+
let _ = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
217+
check_persisted_data!(0);
218+
219+
// Send a few payments and make sure the monitors are updated to the latest.
220+
send_payment(&nodes[0], &vec!(&nodes[1])[..], 8000000, 8_000_000);
221+
check_persisted_data!(5);
222+
send_payment(&nodes[1], &vec!(&nodes[0])[..], 4000000, 4_000_000);
223+
check_persisted_data!(10);
224+
225+
// Close the channel and make sure everything is persisted as expected.
226+
// Force close because cooperative close doesn't result in any persisted
227+
// updates.
228+
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id);
229+
check_closed_broadcast!(nodes[0], false);
230+
check_added_monitors!(nodes[0], 1);
231+
232+
let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
233+
assert_eq!(node_txn.len(), 1);
234+
235+
let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
236+
connect_block(&nodes[1], &Block { header, txdata: vec![node_txn[0].clone(), node_txn[0].clone()]}, 1);
237+
check_closed_broadcast!(nodes[1], false);
238+
check_added_monitors!(nodes[1], 1);
239+
check_persisted_data!(11);
240+
241+
fs::remove_dir_all("./persister0").unwrap();
242+
fs::remove_dir_all("./persister1").unwrap();
243+
}
244+
245+
#[test]
246+
fn test_readonly_dir() {
247+
let data_persister: FilesystemPersister<EnforcingChannelKeys> = FilesystemPersister::new("persister".to_string());
248+
let test_writeable = TestSerializable{};
249+
let test_txo = OutPoint {
250+
txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
251+
index: 0
252+
};
253+
// Create the data persister's directory and set it to read-only.
254+
let path = &data_persister.path_to_channel_data;
255+
fs::create_dir_all(path).unwrap();
256+
let mut perms = fs::metadata(path).unwrap().permissions();
257+
perms.set_readonly(true);
258+
fs::set_permissions(path, perms).unwrap();
259+
match data_persister.write_channel_data(test_txo, &test_writeable) {
260+
Err(e) => {
261+
assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
262+
}
263+
_ => panic!("Unexpected error message")
264+
}
265+
fs::remove_dir_all("./persister").unwrap();
266+
}
267+
}

0 commit comments

Comments
 (0)