Skip to content

Commit a94c096

Browse files
committed
Add FilesystemStore
We upstream the `FilesystemStore` implementation, which is backwards compatible with `lightning-persister::FilesystemPersister`.
1 parent 29ff0d3 commit a94c096

File tree

4 files changed

+443
-0
lines changed

4 files changed

+443
-0
lines changed

lightning-persister/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ libc = "0.2"
2020

2121
[target.'cfg(windows)'.dependencies]
2222
winapi = { version = "0.3", features = ["winbase"] }
23+
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
2324

2425
[target.'cfg(ldk_bench)'.dependencies]
2526
criterion = { version = "0.4", optional = true, default-features = false }

lightning-persister/src/fs_store.rs

Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
//! Objects related to [`FilesystemStore`] live here.
2+
use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};
3+
4+
use lightning::util::persist::KVStore;
5+
use lightning::util::string::PrintableString;
6+
7+
use std::collections::HashMap;
8+
use std::fs;
9+
use std::io::{Read, Write};
10+
use std::path::{Path, PathBuf};
11+
use std::sync::atomic::{AtomicUsize, Ordering};
12+
use std::sync::{Arc, Mutex, RwLock};
13+
14+
#[cfg(target_os = "windows")]
15+
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
16+
17+
#[cfg(target_os = "windows")]
18+
macro_rules! call {
19+
($e: expr) => {
20+
if $e != 0 {
21+
Ok(())
22+
} else {
23+
Err(std::io::Error::last_os_error())
24+
}
25+
};
26+
}
27+
28+
#[cfg(target_os = "windows")]
29+
fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<u16> {
30+
path.as_ref().encode_wide().chain(Some(0)).collect()
31+
}
32+
33+
// The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
34+
const GC_LOCK_INTERVAL: usize = 25;
35+
36+
/// A [`KVStore`] implementation that writes to and reads from the file system.
37+
pub struct FilesystemStore {
38+
data_dir: PathBuf,
39+
tmp_file_counter: AtomicUsize,
40+
gc_counter: AtomicUsize,
41+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
42+
}
43+
44+
impl FilesystemStore {
45+
/// Constructs a new [`FilesystemStore`].
46+
pub fn new(data_dir: PathBuf) -> Self {
47+
let locks = Mutex::new(HashMap::new());
48+
let tmp_file_counter = AtomicUsize::new(0);
49+
let gc_counter = AtomicUsize::new(1);
50+
Self { data_dir, tmp_file_counter, gc_counter, locks }
51+
}
52+
53+
/// Returns the data directory.
54+
pub fn get_data_dir(&self) -> PathBuf {
55+
self.data_dir.clone()
56+
}
57+
58+
fn garbage_collect_locks(&self) {
59+
let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel);
60+
61+
if gc_counter % GC_LOCK_INTERVAL == 0 {
62+
// Take outer lock for the cleanup.
63+
let mut outer_lock = self.locks.lock().unwrap();
64+
65+
// Garbage collect all lock entries that are not referenced anymore.
66+
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
67+
}
68+
}
69+
70+
fn get_dest_dir_path(&self, namespace: &str, sub_namespace: &str) -> std::io::Result<PathBuf> {
71+
let mut dest_dir_path = {
72+
#[cfg(target_os = "windows")]
73+
{
74+
let data_dir = self.data_dir.clone();
75+
fs::create_dir_all(data_dir.clone())?;
76+
fs::canonicalize(data_dir)?
77+
}
78+
#[cfg(not(target_os = "windows"))]
79+
{
80+
self.data_dir.clone()
81+
}
82+
};
83+
84+
dest_dir_path.push(namespace);
85+
if !sub_namespace.is_empty() {
86+
dest_dir_path.push(sub_namespace);
87+
}
88+
89+
Ok(dest_dir_path)
90+
}
91+
}
92+
93+
impl KVStore for FilesystemStore {
94+
fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> std::io::Result<Vec<u8>> {
95+
check_namespace_key_validity(namespace, sub_namespace, Some(key), "read")?;
96+
97+
let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?;
98+
dest_file_path.push(key);
99+
100+
let mut buf = Vec::new();
101+
{
102+
let inner_lock_ref = {
103+
let mut outer_lock = self.locks.lock().unwrap();
104+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
105+
};
106+
let _guard = inner_lock_ref.read().unwrap();
107+
108+
let mut f = fs::File::open(dest_file_path)?;
109+
f.read_to_end(&mut buf)?;
110+
}
111+
112+
self.garbage_collect_locks();
113+
114+
Ok(buf)
115+
}
116+
117+
fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
118+
check_namespace_key_validity(namespace, sub_namespace, Some(key), "write")?;
119+
120+
let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?;
121+
dest_file_path.push(key);
122+
123+
let parent_directory = dest_file_path
124+
.parent()
125+
.ok_or_else(|| {
126+
let msg =
127+
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
128+
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
129+
})?;
130+
fs::create_dir_all(&parent_directory)?;
131+
132+
// Do a crazy dance with lots of fsync()s to be overly cautious here...
133+
// We never want to end up in a state where we've lost the old data, or end up using the
134+
// old data on power loss after we've returned.
135+
// The way to atomically write a file on Unix platforms is:
136+
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
137+
let mut tmp_file_path = dest_file_path.clone();
138+
let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
139+
tmp_file_path.set_extension(tmp_file_ext);
140+
141+
{
142+
let mut tmp_file = fs::File::create(&tmp_file_path)?;
143+
tmp_file.write_all(&buf)?;
144+
tmp_file.sync_all()?;
145+
}
146+
147+
let res = {
148+
let inner_lock_ref = {
149+
let mut outer_lock = self.locks.lock().unwrap();
150+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
151+
};
152+
let _guard = inner_lock_ref.write().unwrap();
153+
154+
#[cfg(not(target_os = "windows"))]
155+
{
156+
fs::rename(&tmp_file_path, &dest_file_path)?;
157+
let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
158+
dir_file.sync_all()?;
159+
Ok(())
160+
}
161+
162+
#[cfg(target_os = "windows")]
163+
{
164+
let res = if dest_file_path.exists() {
165+
call!(unsafe {
166+
windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
167+
path_to_windows_str(dest_file_path.clone()).as_ptr(),
168+
path_to_windows_str(tmp_file_path).as_ptr(),
169+
std::ptr::null(),
170+
windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
171+
std::ptr::null_mut() as *const core::ffi::c_void,
172+
std::ptr::null_mut() as *const core::ffi::c_void,
173+
)
174+
})
175+
} else {
176+
call!(unsafe {
177+
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
178+
path_to_windows_str(tmp_file_path).as_ptr(),
179+
path_to_windows_str(dest_file_path.clone()).as_ptr(),
180+
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
181+
| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
182+
)
183+
})
184+
};
185+
186+
match res {
187+
Ok(()) => {
188+
// We fsync the dest file in hopes this will also flush the metadata to disk.
189+
let dest_file = fs::OpenOptions::new().read(true).write(true)
190+
.open(&dest_file_path)?;
191+
dest_file.sync_all()?;
192+
Ok(())
193+
}
194+
Err(e) => Err(e),
195+
}
196+
}
197+
};
198+
199+
self.garbage_collect_locks();
200+
201+
res
202+
}
203+
204+
fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> std::io::Result<()> {
205+
check_namespace_key_validity(namespace, sub_namespace, Some(key), "remove")?;
206+
207+
let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?;
208+
dest_file_path.push(key);
209+
210+
if !dest_file_path.is_file() {
211+
return Ok(());
212+
}
213+
214+
{
215+
let inner_lock_ref = {
216+
let mut outer_lock = self.locks.lock().unwrap();
217+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
218+
};
219+
let _guard = inner_lock_ref.write().unwrap();
220+
221+
if lazy {
222+
// If we're lazy we just call remove and be done with it.
223+
fs::remove_file(&dest_file_path)?;
224+
} else {
225+
// If we're not lazy we try our best to persist the updated metadata to ensure
226+
// atomicity of this call.
227+
#[cfg(not(target_os = "windows"))]
228+
{
229+
fs::remove_file(&dest_file_path)?;
230+
231+
let parent_directory = dest_file_path.parent().ok_or_else(|| {
232+
let msg =
233+
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
234+
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
235+
})?;
236+
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
237+
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
238+
// to the inode might get cached (and hence possibly lost on crash), depending on
239+
// the target platform and file system.
240+
//
241+
// In order to assert we permanently removed the file in question we therefore
242+
// call `fsync` on the parent directory on platforms that support it.
243+
dir_file.sync_all()?;
244+
}
245+
246+
#[cfg(target_os = "windows")]
247+
{
248+
// Since Windows `DeleteFile` API is not persisted until the last open file handle
249+
// is dropped, and there seemingly is no reliable way to flush the directory
250+
// metadata, we here fall back to use a 'recycling bin' model, i.e., first move the
251+
// file to be deleted to a temporary trash file and remove the latter file
252+
// afterwards.
253+
//
254+
// This should be marginally better, as, according to the documentation,
255+
// `MoveFileExW` APIs should offer stronger persistence guarantees,
256+
// at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set.
257+
// However, all this is partially based on assumptions and local experiments, as
258+
// Windows API is horribly underdocumented.
259+
let mut trash_file_path = dest_file_path.clone();
260+
let trash_file_ext = format!("{}.trash",
261+
self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
262+
trash_file_path.set_extension(trash_file_ext);
263+
264+
call!(unsafe {
265+
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
266+
path_to_windows_str(dest_file_path).as_ptr(),
267+
path_to_windows_str(trash_file_path.clone()).as_ptr(),
268+
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
269+
| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
270+
)
271+
})?;
272+
273+
{
274+
// We fsync the trash file in hopes this will also flush the original's file
275+
// metadata to disk.
276+
let trash_file = fs::OpenOptions::new().read(true).write(true)
277+
.open(&trash_file_path.clone())?;
278+
trash_file.sync_all()?;
279+
}
280+
281+
// We're fine if this remove would fail as the trash file will be cleaned up in
282+
// list eventually.
283+
fs::remove_file(trash_file_path).ok();
284+
}
285+
}
286+
}
287+
288+
self.garbage_collect_locks();
289+
290+
Ok(())
291+
}
292+
293+
fn list(&self, namespace: &str, sub_namespace: &str) -> std::io::Result<Vec<String>> {
294+
check_namespace_key_validity(namespace, sub_namespace, None, "list")?;
295+
296+
let prefixed_dest = self.get_dest_dir_path(namespace, sub_namespace)?;
297+
let mut keys = Vec::new();
298+
299+
if !Path::new(&prefixed_dest).exists() {
300+
return Ok(Vec::new());
301+
}
302+
303+
for entry in fs::read_dir(&prefixed_dest)? {
304+
let entry = entry?;
305+
let p = entry.path();
306+
307+
if let Some(ext) = p.extension() {
308+
#[cfg(target_os = "windows")]
309+
{
310+
// Clean up any trash files lying around.
311+
if ext == "trash" {
312+
fs::remove_file(p).ok();
313+
continue;
314+
}
315+
}
316+
if ext == "tmp" {
317+
continue;
318+
}
319+
}
320+
321+
let metadata = p.metadata()?;
322+
323+
// We allow the presence of directories in the empty namespace and just skip them.
324+
if metadata.is_dir() {
325+
continue;
326+
}
327+
328+
// If we otherwise don't find a file at the given path something went wrong.
329+
if !metadata.is_file() {
330+
debug_assert!(false, "Failed to list keys of {}/{}: file couldn't be accessed.",
331+
PrintableString(namespace), PrintableString(sub_namespace));
332+
let msg = format!("Failed to list keys of {}/{}: file couldn't be accessed.",
333+
PrintableString(namespace), PrintableString(sub_namespace));
334+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
335+
}
336+
337+
match p.strip_prefix(&prefixed_dest) {
338+
Ok(stripped_path) => {
339+
if let Some(relative_path) = stripped_path.to_str() {
340+
if is_valid_kvstore_str(relative_path) {
341+
keys.push(relative_path.to_string())
342+
}
343+
} else {
344+
debug_assert!(false, "Failed to list keys of {}/{}: file path is not valid UTF-8",
345+
PrintableString(namespace), PrintableString(sub_namespace));
346+
let msg = format!("Failed to list keys of {}/{}: file path is not valid UTF-8",
347+
PrintableString(namespace), PrintableString(sub_namespace));
348+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
349+
}
350+
}
351+
Err(e) => {
352+
debug_assert!(false, "Failed to list keys of {}/{}: {}",
353+
PrintableString(namespace), PrintableString(sub_namespace), e);
354+
let msg = format!("Failed to list keys of {}/{}: {}",
355+
PrintableString(namespace), PrintableString(sub_namespace), e);
356+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
357+
}
358+
}
359+
}
360+
361+
self.garbage_collect_locks();
362+
363+
Ok(keys)
364+
}
365+
}
366+
367+
#[cfg(test)]
368+
mod tests {
369+
use super::*;
370+
use crate::test_utils::do_read_write_remove_list_persist;
371+
372+
#[test]
373+
fn read_write_remove_list_persist() {
374+
let mut temp_path = std::env::temp_dir();
375+
temp_path.push("test_read_write_remove_list_persist");
376+
let fs_store = FilesystemStore::new(temp_path);
377+
do_read_write_remove_list_persist(&fs_store);
378+
}
379+
}

lightning-persister/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010

1111
#[cfg(ldk_bench)] extern crate criterion;
1212

13+
pub mod fs_store;
14+
15+
mod utils;
16+
1317
#[cfg(test)]
1418
mod test_utils;
1519

0 commit comments

Comments
 (0)