Skip to content

Commit 1f5c0e1

Browse files
committed
Create separate timer for scorer persistence in background processor.
1 parent c3f72d8 commit 1f5c0e1

File tree

4 files changed

+76
-66
lines changed

4 files changed

+76
-66
lines changed

fuzz/src/process_network_graph.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
// Imports that need to be added manually
2-
use std::sync::Arc;
3-
42
use lightning_rapid_gossip_sync::RapidGossipSync;
53
use utils::test_logger;
64

75
/// Actual fuzz test, method signature and name are fixed
86
fn do_test(data: &[u8]) {
97
let block_hash = bitcoin::BlockHash::default();
108
let network_graph = lightning::routing::network_graph::NetworkGraph::new(block_hash);
11-
let rapid_sync = RapidGossipSync::new(Arc::new(network_graph));
9+
let rapid_sync = RapidGossipSync::new(&network_graph);
1210
rapid_sync.update_network_graph(data);
1311
}
1412

lightning-background-processor/src/lib.rs

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ const PING_TIMER: u64 = 1;
7979
/// Prune the network graph of stale entries hourly.
8080
const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
8181

82+
#[cfg(all(not(test), debug_assertions))]
83+
const SCORER_PERSIST_TIMER: u64 = 30;
84+
#[cfg(test)]
85+
const SCORER_PERSIST_TIMER: u64 = 1;
86+
8287
#[cfg(not(test))]
8388
const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
8489
#[cfg(test)]
@@ -144,6 +149,12 @@ impl BackgroundProcessor {
144149
/// functionality implemented by other handlers.
145150
/// * [`NetGraphMsgHandler`] if given will update the [`NetworkGraph`] based on payment failures.
146151
///
152+
/// # Rapid Gossip Sync
153+
///
154+
/// If a rapid gossip sync is meant to run at startup, pass an optional [`RapidGossipSync`]
155+
/// to `rapid_gossip_sync` to indicate to [`BackgroundProcessor`] not to prune the
156+
/// [`NetworkGraph`] instance until the [`RapidGossipSync`] instance completes its first sync.
157+
///
147158
/// [top-level documentation]: BackgroundProcessor
148159
/// [`join`]: Self::join
149160
/// [`stop`]: Self::stop
@@ -153,13 +164,6 @@ impl BackgroundProcessor {
153164
/// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
154165
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
155166
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
156-
///
157-
/// # Rapid Gossip Sync
158-
///
159-
/// If a rapid gossip sync is meant to run at startup, pass an option with a set
160-
/// [`RapidGossipSync`] reference to `rapid_gossip_sync` to indicate to [`BackgroundProcessor`]
161-
/// not to prune the [`NetworkGraph`] instance until the [`RapidGossipSync`] instance
162-
/// completes its first sync.
163167
pub fn start<
164168
'a,
165169
Signer: 'static + Sign,
@@ -215,6 +219,7 @@ impl BackgroundProcessor {
215219
let mut last_freshness_call = Instant::now();
216220
let mut last_ping_call = Instant::now();
217221
let mut last_prune_call = Instant::now();
222+
let mut last_scorer_persist_call = Instant::now();
218223
let mut have_pruned = false;
219224

220225
loop {
@@ -309,11 +314,14 @@ impl BackgroundProcessor {
309314
}
310315
}
311316

312-
if let Some(ref scorer) = scorer {
313-
log_trace!(logger, "Persisting scorer");
314-
if let Err(e) = persister.persist_scorer(&scorer) {
315-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
317+
if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
318+
if let Some(ref scorer) = scorer {
319+
log_trace!(logger, "Persisting scorer");
320+
if let Err(e) = persister.persist_scorer(&scorer) {
321+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
322+
}
316323
}
324+
last_scorer_persist_call = Instant::now();
317325
}
318326
}
319327

@@ -442,6 +450,7 @@ mod tests {
442450
logger: Arc<test_utils::TestLogger>,
443451
best_block: BestBlock,
444452
scorer: Arc<Mutex<FixedPenaltyScorer>>,
453+
rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>>
445454
}
446455

447456
impl Drop for Node {
@@ -541,7 +550,8 @@ mod tests {
541550
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
542551
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
543552
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
544-
let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
553+
let rapid_gossip_sync = None;
554+
let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer, rapid_gossip_sync };
545555
nodes.push(node);
546556
}
547557

@@ -639,8 +649,7 @@ mod tests {
639649
let data_dir = nodes[0].persister.get_data_dir();
640650
let persister = Arc::new(Persister::new(data_dir));
641651
let event_handler = |_: &_| {};
642-
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
643-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
652+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
644653

645654
macro_rules! check_persisted_data {
646655
($node: expr, $filepath: expr) => {
@@ -705,8 +714,7 @@ mod tests {
705714
let data_dir = nodes[0].persister.get_data_dir();
706715
let persister = Arc::new(Persister::new(data_dir));
707716
let event_handler = |_: &_| {};
708-
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
709-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
717+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
710718
loop {
711719
let log_entries = nodes[0].logger.lines.lock().unwrap();
712720
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
@@ -729,8 +737,7 @@ mod tests {
729737
let data_dir = nodes[0].persister.get_data_dir();
730738
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
731739
let event_handler = |_: &_| {};
732-
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
733-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
740+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
734741
match bg_processor.join() {
735742
Ok(_) => panic!("Expected error persisting manager"),
736743
Err(e) => {
@@ -747,8 +754,7 @@ mod tests {
747754
let data_dir = nodes[0].persister.get_data_dir();
748755
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
749756
let event_handler = |_: &_| {};
750-
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
751-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
757+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
752758

753759
match bg_processor.stop() {
754760
Ok(_) => panic!("Expected error persisting network graph"),
@@ -766,8 +772,7 @@ mod tests {
766772
let data_dir = nodes[0].persister.get_data_dir();
767773
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
768774
let event_handler = |_: &_| {};
769-
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
770-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
775+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
771776

772777
match bg_processor.stop() {
773778
Ok(_) => panic!("Expected error persisting scorer"),
@@ -790,8 +795,7 @@ mod tests {
790795
let event_handler = move |event: &Event| {
791796
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
792797
};
793-
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
794-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
798+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
795799

796800
// Open a channel and check that the FundingGenerationReady event was handled.
797801
begin_open_channel!(nodes[0], nodes[1], channel_value);
@@ -816,8 +820,7 @@ mod tests {
816820
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
817821
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
818822
let persister = Arc::new(Persister::new(data_dir));
819-
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
820-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
823+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
821824

822825
// Force close the channel and check that the SpendableOutputs event was handled.
823826
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
@@ -835,6 +838,25 @@ mod tests {
835838
assert!(bg_processor.stop().is_ok());
836839
}
837840

841+
#[test]
842+
fn test_scorer_persistence() {
843+
let nodes = create_nodes(2, "test_scorer_persistence".to_string());
844+
let data_dir = nodes[0].persister.get_data_dir();
845+
let persister = Arc::new(Persister::new(data_dir));
846+
let event_handler = |_: &_| {};
847+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
848+
849+
loop {
850+
let log_entries = nodes[0].logger.lines.lock().unwrap();
851+
let expected_log = "Persisting scorer".to_string();
852+
if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
853+
break
854+
}
855+
}
856+
857+
assert!(bg_processor.stop().is_ok());
858+
}
859+
838860
#[test]
839861
fn test_not_pruning_network_graph_until_graph_sync_completion() {
840862
let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
@@ -886,14 +908,9 @@ mod tests {
886908
let _ = receiver
887909
.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 2))
888910
.expect("Network graph not pruned within deadline");
889-
let current_graph_description = network_graph.to_string();
890911

891912
background_processor.stop().unwrap();
892913

893-
assert_ne!(current_graph_description, original_graph_description);
894-
assert!(!current_graph_description.contains("node_one:"));
895-
assert!(!current_graph_description.contains("node_two:"));
896-
897914
// all channels should now be pruned
898915
assert_eq!(network_graph.read_only().channels().len(), 0);
899916
}
@@ -910,8 +927,7 @@ mod tests {
910927
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
911928
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
912929
let event_handler = Arc::clone(&invoice_payer);
913-
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
914-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
930+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
915931
assert!(bg_processor.stop().is_ok());
916932
}
917933
}

lightning-rapid-gossip-sync/src/lib.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,14 @@
2727
//! its contents from disk, which we do by calling the `sync_network_graph_with_file_path` method:
2828
//!
2929
//! ```
30-
//! use std::sync::Arc;
3130
//! use bitcoin::blockdata::constants::genesis_block;
3231
//! use bitcoin::Network;
3332
//! use lightning::routing::network_graph::NetworkGraph;
3433
//! use lightning_rapid_gossip_sync::RapidGossipSync;
3534
//!
3635
//! let block_hash = genesis_block(Network::Bitcoin).header.block_hash();
37-
//! let network_graph = Arc::new(NetworkGraph::new(block_hash));
38-
//! let rapid_sync = RapidGossipSync::new(network_graph);
36+
//! let network_graph = NetworkGraph::new(block_hash);
37+
//! let rapid_sync = RapidGossipSync::new(&network_graph);
3938
//! let new_last_sync_timestamp_result = rapid_sync.sync_network_graph_with_file_path("./rapid_sync.lngossip");
4039
//! ```
4140
//!
@@ -61,7 +60,6 @@ extern crate test;
6160

6261
use std::fs::File;
6362
use std::ops::Deref;
64-
use std::sync::Arc;
6563
use std::sync::atomic::{AtomicBool, Ordering};
6664

6765
use lightning::routing::network_graph::NetworkGraph;
@@ -75,19 +73,20 @@ pub mod error;
7573
pub mod processing;
7674

7775
/// Rapid Gossip Sync holder struct
78-
/// todo: ask reviewers for more interesting things to say here
76+
/// See [module-level documentation] for usage.
77+
///
78+
/// [module-level documentation]: crate
7979
pub struct RapidGossipSync<NG: Deref<Target=NetworkGraph>> {
8080
network_graph: NG,
81-
is_initial_sync_complete: Arc<AtomicBool>
81+
is_initial_sync_complete: AtomicBool
8282
}
8383

8484
impl<NG: Deref<Target=NetworkGraph>> RapidGossipSync<NG> {
85-
/// Instantiate a new RapidGossipSync holder
86-
/// todo: same as above
85+
/// Instantiate a new RapidGossipSync instance
8786
pub fn new(network_graph: NG) -> Self {
8887
Self {
8988
network_graph,
90-
is_initial_sync_complete: Arc::new(AtomicBool::new(false))
89+
is_initial_sync_complete: AtomicBool::new(false)
9190
}
9291
}
9392

@@ -106,7 +105,7 @@ impl<NG: Deref<Target=NetworkGraph>> RapidGossipSync<NG> {
106105
self.update_network_graph_from_byte_stream(&mut file)
107106
}
108107

109-
/// Bool indicating whether a rapid gossip sync has completed at least once
108+
/// Returns whether a rapid gossip sync has completed at least once
110109
pub fn is_initial_sync_complete(&self) -> bool {
111110
self.is_initial_sync_complete.load(Ordering::Acquire)
112111
}
@@ -115,7 +114,6 @@ impl<NG: Deref<Target=NetworkGraph>> RapidGossipSync<NG> {
115114
#[cfg(test)]
116115
mod tests {
117116
use std::fs;
118-
use std::sync::Arc;
119117

120118
use bitcoin::blockdata::constants::genesis_block;
121119
use bitcoin::Network;
@@ -181,11 +179,11 @@ mod tests {
181179
let graph_sync_test_file = sync_test.get_test_file_path();
182180

183181
let block_hash = genesis_block(Network::Bitcoin).block_hash();
184-
let network_graph = Arc::new(NetworkGraph::new(block_hash));
182+
let network_graph = NetworkGraph::new(block_hash);
185183

186184
assert_eq!(network_graph.read_only().channels().len(), 0);
187185

188-
let rapid_sync = RapidGossipSync::new(network_graph.clone());
186+
let rapid_sync = RapidGossipSync::new(&network_graph);
189187
let sync_result = rapid_sync.sync_network_graph_with_file_path(&graph_sync_test_file);
190188

191189
if sync_result.is_err() {
@@ -217,12 +215,12 @@ mod tests {
217215

218216
assert_eq!(network_graph.read_only().channels().len(), 0);
219217

220-
let rapid_sync = RapidGossipSync::new(Arc::new(network_graph));
218+
let rapid_sync = RapidGossipSync::new(&network_graph);
221219
let start = std::time::Instant::now();
222220
let sync_result = rapid_sync
223221
.sync_network_graph_with_file_path("./res/full_graph.lngossip");
224222
if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result {
225-
let error_string = format!("Input file lightning-graph-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error);
223+
let error_string = format!("Input file lightning-rapid-gossip-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error);
226224
#[cfg(not(require_route_graph_test))]
227225
{
228226
println!("{}", error_string);
@@ -241,7 +239,6 @@ mod tests {
241239

242240
#[cfg(all(test, feature = "_bench_unstable"))]
243241
pub mod bench {
244-
use std::sync::Arc;
245242
use test::Bencher;
246243

247244
use bitcoin::blockdata::constants::genesis_block;
@@ -256,11 +253,11 @@ pub mod bench {
256253
fn bench_reading_full_graph_from_file(b: &mut Bencher) {
257254
let block_hash = genesis_block(Network::Bitcoin).block_hash();
258255
b.iter(|| {
259-
let network_graph = Arc::new(NetworkGraph::new(block_hash));
260-
let rapid_sync = RapidGossipSync::new(network_graph);
256+
let network_graph = NetworkGraph::new(block_hash);
257+
let rapid_sync = RapidGossipSync::new(&network_graph);
261258
let sync_result = rapid_sync.sync_network_graph_with_file_path("./res/full_graph.lngossip");
262259
if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result {
263-
let error_string = format!("Input file lightning-graph-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error);
260+
let error_string = format!("Input file lightning-rapid-gossip-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error);
264261
#[cfg(not(require_route_graph_test))]
265262
{
266263
println!("{}", error_string);

0 commit comments

Comments
 (0)