Skip to content

ChannelManager persistence #752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"lightning-block-sync",
"lightning-net-tokio",
"lightning-persister",
"background-processor",
]

# Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it.
Expand Down
15 changes: 15 additions & 0 deletions background-processor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "background-processor"
version = "0.1.0"
authors = ["Valentine Wallace <[email protected]>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bitcoin = "0.24"
lightning = { version = "0.0.12", path = "../lightning", features = ["allow_wallclock_use"] }
lightning-persister = { version = "0.0.1", path = "../lightning-persister" }

[dev-dependencies]
lightning = { version = "0.0.12", path = "../lightning", features = ["_test_utils"] }
294 changes: 294 additions & 0 deletions background-processor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
#[macro_use] extern crate lightning;

use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::keysinterface::{ChannelKeys, KeysInterface};
use lightning::ln::channelmanager::ChannelManager;
use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};

/// BackgroundProcessor takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
/// responsibilities are:
/// * Monitoring whether the ChannelManager needs to be re-persisted to disk, and if so,
/// writing it to disk/backups by invoking the callback given to it at startup.
/// ChannelManager persistence should be done in the background.
/// * Calling `ChannelManager::timer_chan_freshness_every_min()` every minute (can be done in the
/// background).
Comment on lines +18 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider removing the part within each bullet about being done in the background as it may be redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to distinguish between stuff that can be done in the background vs should be done in the background, which I think is a potentially useful piece of info.

///
/// Note that if ChannelManager persistence fails and the persisted manager becomes out-of-date,
/// then there is a risk of channels force-closing on startup when the manager realizes it's
/// outdated. However, as long as `ChannelMonitor` backups are sound, no funds besides those used
/// for unilateral chain closure fees are at risk.
pub struct BackgroundProcessor {
stop_thread: Arc<AtomicBool>,
/// May be used to retrieve and handle the error if `BackgroundProcessor`'s thread
/// exits due to an error while persisting.
pub thread_handle: JoinHandle<Result<(), std::io::Error>>,
}

#[cfg(not(test))]
const CHAN_FRESHNESS_TIMER: u64 = 60;
#[cfg(test)]
const CHAN_FRESHNESS_TIMER: u64 = 1;

impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the top-level
/// documentation.
///
/// If `persist_manager` returns an error, then this thread will return said error (and `start()`
/// will need to be called again to restart the `BackgroundProcessor`). Users should wait on
/// [`thread_handle`]'s `join()` method to be able to tell if and when an error is returned, or
/// implement `persist_manager` such that an error is never returned to the `BackgroundProcessor`
///
/// `persist_manager` is responsible for writing out the `ChannelManager` to disk, and/or uploading
/// to one or more backup services. See [`ChannelManager::write`] for writing out a `ChannelManager`.
/// See [`FilesystemPersister::persist_manager`] for Rust-Lightning's provided implementation.
///
/// [`thread_handle`]: struct.BackgroundProcessor.html#structfield.thread_handle
/// [`ChannelManager::write`]: ../lightning/ln/channelmanager/struct.ChannelManager.html#method.write
/// [`FilesystemPersister::persist_manager`]: ../lightning_persister/struct.FilesystemPersister.html#impl
pub fn start<PM, ChanSigner, M, T, K, F, L>(persist_manager: PM, manager: Arc<ChannelManager<ChanSigner, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>>, logger: Arc<L>) -> Self
where ChanSigner: 'static + ChannelKeys + Writeable,
M: 'static + chain::Watch<Keys=ChanSigner>,
T: 'static + BroadcasterInterface,
K: 'static + KeysInterface<ChanKeySigner=ChanSigner>,
F: 'static + FeeEstimator,
L: 'static + Logger,
PM: 'static + Send + Fn(&ChannelManager<ChanSigner, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>) -> Result<(), std::io::Error>,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
let mut current_time = Instant::now();
loop {
let updates_available = manager.wait_timeout(Duration::from_millis(100));
if updates_available {
persist_manager(&*manager)?;
}
// Exit the loop if the background processor was requested to stop.
if stop_thread.load(Ordering::Acquire) == true {
log_trace!(logger, "Terminating background processor.");
return Ok(())
}
if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER {
log_trace!(logger, "Calling manager's timer_chan_freshness_every_min");
manager.timer_chan_freshness_every_min();
current_time = Instant::now();
}
}
});
Self {
stop_thread: stop_thread_clone,
thread_handle: handle,
}
}

/// Stop `BackgroundProcessor`'s thread.
pub fn stop(self) -> Result<(), std::io::Error> {
self.stop_thread.store(true, Ordering::Release);
self.thread_handle.join().unwrap()
}
}

#[cfg(test)]
mod tests {
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::blockdata::transaction::{Transaction, TxOut};
use bitcoin::network::constants::Network;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor;
use lightning::chain::keysinterface::{ChannelKeys, InMemoryChannelKeys, KeysInterface, KeysManager};
use lightning::chain::transaction::OutPoint;
use lightning::get_event_msg;
use lightning::ln::channelmanager::{ChannelManager, SimpleArcChannelManager};
use lightning::ln::features::InitFeatures;
use lightning::ln::msgs::ChannelMessageHandler;
use lightning::util::config::UserConfig;
use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent};
use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use lightning::util::test_utils;
use lightning_persister::FilesystemPersister;
use std::fs;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use super::BackgroundProcessor;

type ChainMonitor = chainmonitor::ChainMonitor<InMemoryChannelKeys, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;

struct Node {
node: SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>,
persister: Arc<FilesystemPersister>,
logger: Arc<test_utils::TestLogger>,
}

impl Drop for Node {
fn drop(&mut self) {
let data_dir = self.persister.get_data_dir();
match fs::remove_dir_all(data_dir.clone()) {
Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
_ => {}
}
}
}

fn get_full_filepath(filepath: String, filename: String) -> String {
let mut path = PathBuf::from(filepath);
path.push(filename);
path.to_str().unwrap().to_string()
}

fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
let mut nodes = Vec::new();
for i in 0..num_nodes {
let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())});
let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
let seed = [i as u8; 32];
let network = Network::Testnet;
let now = Duration::from_secs(genesis_block(network).header.time as u64);
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
let manager = Arc::new(ChannelManager::new(Network::Testnet, fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), i));
let node = Node { node: manager, persister, logger };
nodes.push(node);
}
nodes
}

macro_rules! open_channel {
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
$node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
$node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
$node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
let events = $node_a.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
let (temporary_channel_id, tx, funding_output) = match events[0] {
Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => {
assert_eq!(*channel_value_satoshis, $channel_value);
assert_eq!(user_channel_id, 42);

let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
value: *channel_value_satoshis, script_pubkey: output_script.clone(),
}]};
let funding_outpoint = OutPoint { txid: tx.txid(), index: 0 };
(*temporary_channel_id, tx, funding_outpoint)
},
_ => panic!("Unexpected event"),
};

$node_a.node.funding_transaction_generated(&temporary_channel_id, funding_output);
$node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
$node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
tx
}}
}

#[test]
fn test_background_processor() {
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
// updates. Also test that when new updates are available, the manager signals that it needs
// re-persistence and is successfully re-persisted.
let nodes = create_nodes(2, "test_background_processor".to_string());

// Initiate the background processors to watch each node.
let data_dir = nodes[0].persister.get_data_dir();
let callback = move |node: &ChannelManager<InMemoryChannelKeys, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());

// Go through the channel creation process until each node should have something persisted.
let tx = open_channel!(nodes[0], nodes[1], 100000);

macro_rules! check_persisted_data {
($node: expr, $filepath: expr, $expected_bytes: expr) => {
match $node.write(&mut $expected_bytes) {
Ok(()) => {
loop {
match std::fs::read($filepath) {
Ok(bytes) => {
if bytes == $expected_bytes {
break
} else {
continue
}
},
Err(_) => continue
}
}
},
Err(e) => panic!("Unexpected error: {}", e)
}
}
}

// Check that the initial channel manager data is persisted as expected.
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
let mut expected_bytes = Vec::new();
check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
loop {
if !nodes[0].node.get_persistence_condvar_value() { break }
}

// Force-close the channel.
nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id()).unwrap();

// Check that the force-close updates are persisted.
let mut expected_bytes = Vec::new();
check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
loop {
if !nodes[0].node.get_persistence_condvar_value() { break }
}

assert!(bg_processor.stop().is_ok());
}

#[test]
fn test_chan_freshness_called() {
// Test that ChannelManager's `timer_chan_freshness_every_min` is called every
// `CHAN_FRESHNESS_TIMER`.
let nodes = create_nodes(1, "test_chan_freshness_called".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let callback = move |node: &ChannelManager<InMemoryChannelKeys, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string();
if log_entries.get(&("background_processor".to_string(), desired_log)).is_some() {
break
}
Comment on lines +266 to +268
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally a failing test would fail-fast rather than hang. Not sure if there's much of an option here, but something that we should keep in mind as it likely means the underlying code could be reworked.

}

assert!(bg_processor.stop().is_ok());
}

#[test]
fn test_persist_error() {
// Test that if we encounter an error during manager persistence, the thread panics.
fn persist_manager<ChanSigner, M, T, K, F, L>(_data: &ChannelManager<ChanSigner, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>) -> Result<(), std::io::Error>
where ChanSigner: 'static + ChannelKeys + Writeable,
M: 'static + chain::Watch<Keys=ChanSigner>,
T: 'static + BroadcasterInterface,
K: 'static + KeysInterface<ChanKeySigner=ChanSigner>,
F: 'static + FeeEstimator,
L: 'static + Logger,
{
Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
}

let nodes = create_nodes(2, "test_persist_error".to_string());
let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].logger.clone());
open_channel!(nodes[0], nodes[1], 100000);

let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test");
}
}
3 changes: 3 additions & 0 deletions lightning-persister/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ bitcoin = "0.24"
lightning = { version = "0.0.12", path = "../lightning" }
libc = "0.2"

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["winbase"] }

[dev-dependencies.bitcoin]
version = "0.24"
features = ["bitcoinconsensus"]
Expand Down
Loading