Skip to content

Commit c3f72d8

Browse files
committed
Create wrapper struct for rapid sync that BackgroundProcessor can reference for rapid gossip sync completion detection.
1 parent 0b77008 commit c3f72d8

File tree

6 files changed

+424
-258
lines changed

6 files changed

+424
-258
lines changed

fuzz/src/process_network_graph.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1-
// Import that needs to be added manually
1+
// Imports that need to be added manually
2+
use std::sync::Arc;
3+
4+
use lightning_rapid_gossip_sync::RapidGossipSync;
25
use utils::test_logger;
36

47
/// Actual fuzz test, method signature and name are fixed
58
fn do_test(data: &[u8]) {
69
let block_hash = bitcoin::BlockHash::default();
710
let network_graph = lightning::routing::network_graph::NetworkGraph::new(block_hash);
8-
lightning_rapid_gossip_sync::processing::update_network_graph(&network_graph, data);
11+
let rapid_sync = RapidGossipSync::new(Arc::new(network_graph));
12+
rapid_sync.update_network_graph(data);
913
}
1014

1115
/// Method that needs to be added manually, {name}_test

lightning-background-processor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ rustdoc-args = ["--cfg", "docsrs"]
1616
[dependencies]
1717
bitcoin = "0.28.1"
1818
lightning = { version = "0.0.106", path = "../lightning", features = ["std"] }
19+
lightning-rapid-gossip-sync = { version = "0.0.106", path = "../lightning-rapid-gossip-sync" }
1920

2021
[dev-dependencies]
2122
lightning = { version = "0.0.106", path = "../lightning", features = ["_test_utils"] }

lightning-background-processor/src/lib.rs

Lines changed: 131 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
1010

1111
#[macro_use] extern crate lightning;
12+
extern crate lightning_rapid_gossip_sync;
1213

1314
use lightning::chain;
1415
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
@@ -28,6 +29,7 @@ use std::thread;
2829
use std::thread::JoinHandle;
2930
use std::time::{Duration, Instant};
3031
use std::ops::Deref;
32+
use lightning_rapid_gossip_sync::RapidGossipSync;
3133

3234
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
3335
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@@ -151,6 +153,13 @@ impl BackgroundProcessor {
151153
/// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
152154
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
153155
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
156+
///
157+
/// # Rapid Gossip Sync
158+
///
159+
/// If a rapid gossip sync is meant to run at startup, pass an option with a set
160+
/// [`RapidGossipSync`] reference to `rapid_gossip_sync` to indicate to [`BackgroundProcessor`]
161+
/// not to prune the [`NetworkGraph`] instance until the [`RapidGossipSync`] instance
162+
/// completes its first sync.
154163
pub fn start<
155164
'a,
156165
Signer: 'static + Sign,
@@ -175,9 +184,11 @@ impl BackgroundProcessor {
175184
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
176185
S: 'static + Deref<Target = SC> + Send + Sync,
177186
SC: WriteableScore<'a>,
187+
RGS: 'static + Deref<Target = RapidGossipSync<G>> + Send
178188
>(
179189
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
180-
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S>
190+
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S>,
191+
rapid_gossip_sync: Option<RGS>
181192
) -> Self
182193
where
183194
CA::Target: 'static + chain::Access,
@@ -273,21 +284,36 @@ impl BackgroundProcessor {
273284
// continuing our normal cadence.
274285
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
275286
if let Some(ref handler) = net_graph_msg_handler {
276-
log_trace!(logger, "Pruning network graph of stale entries");
277-
handler.network_graph().remove_stale_channels();
278-
if let Err(e) = persister.persist_graph(handler.network_graph()) {
279-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
280-
}
281-
}
282-
if let Some(ref scorer) = scorer {
283-
log_trace!(logger, "Persisting scorer");
284-
if let Err(e) = persister.persist_scorer(&scorer) {
285-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
287+
log_trace!(logger, "Assessing prunability of network graph");
288+
289+
// The network graph must not be pruned while rapid sync completion is pending
290+
let should_prune = if let Some(rapid_sync) = rapid_gossip_sync.as_ref() {
291+
rapid_sync.is_initial_sync_complete()
292+
} else {
293+
true
294+
};
295+
296+
if should_prune {
297+
log_trace!(logger, "Pruning network graph of stale entries");
298+
handler.network_graph().remove_stale_channels();
299+
300+
if let Err(e) = persister.persist_graph(handler.network_graph()) {
301+
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
302+
}
303+
304+
last_prune_call = Instant::now();
305+
have_pruned = true;
306+
} else {
307+
log_trace!(logger, "Not pruning network graph due to pending gossip sync");
286308
}
287309
}
310+
}
288311

289-
last_prune_call = Instant::now();
290-
have_pruned = true;
312+
if let Some(ref scorer) = scorer {
313+
log_trace!(logger, "Persisting scorer");
314+
if let Err(e) = persister.persist_scorer(&scorer) {
315+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
316+
}
291317
}
292318
}
293319

@@ -370,7 +396,7 @@ mod tests {
370396
use lightning::chain::transaction::OutPoint;
371397
use lightning::get_event_msg;
372398
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
373-
use lightning::ln::features::InitFeatures;
399+
use lightning::ln::features::{ChannelFeatures, InitFeatures};
374400
use lightning::ln::msgs::{ChannelMessageHandler, Init};
375401
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
376402
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
@@ -385,8 +411,10 @@ mod tests {
385411
use std::fs;
386412
use std::path::PathBuf;
387413
use std::sync::{Arc, Mutex};
414+
use std::sync::mpsc::SyncSender;
388415
use std::time::Duration;
389416
use lightning::routing::scoring::{FixedPenaltyScorer};
417+
use lightning_rapid_gossip_sync::RapidGossipSync;
390418
use super::{BackgroundProcessor, FRESHNESS_TIMER};
391419

392420
const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
@@ -428,6 +456,7 @@ mod tests {
428456

429457
struct Persister {
430458
graph_error: Option<(std::io::ErrorKind, &'static str)>,
459+
graph_persistence_notifier: Option<SyncSender<()>>,
431460
manager_error: Option<(std::io::ErrorKind, &'static str)>,
432461
scorer_error: Option<(std::io::ErrorKind, &'static str)>,
433462
filesystem_persister: FilesystemPersister,
@@ -436,13 +465,17 @@ mod tests {
436465
impl Persister {
437466
fn new(data_dir: String) -> Self {
438467
let filesystem_persister = FilesystemPersister::new(data_dir.clone());
439-
Self { graph_error: None, manager_error: None, scorer_error: None, filesystem_persister }
468+
Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
440469
}
441470

442471
fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
443472
Self { graph_error: Some((error, message)), ..self }
444473
}
445474

475+
fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
476+
Self { graph_persistence_notifier: Some(sender), ..self }
477+
}
478+
446479
fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
447480
Self { manager_error: Some((error, message)), ..self }
448481
}
@@ -461,6 +494,10 @@ mod tests {
461494
}
462495

463496
if key == "network_graph" {
497+
if let Some(sender) = &self.graph_persistence_notifier {
498+
sender.send(()).unwrap();
499+
};
500+
464501
if let Some((error, message)) = self.graph_error {
465502
return Err(std::io::Error::new(error, message))
466503
}
@@ -602,7 +639,8 @@ mod tests {
602639
let data_dir = nodes[0].persister.get_data_dir();
603640
let persister = Arc::new(Persister::new(data_dir));
604641
let event_handler = |_: &_| {};
605-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
642+
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
643+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
606644

607645
macro_rules! check_persisted_data {
608646
($node: expr, $filepath: expr) => {
@@ -667,7 +705,8 @@ mod tests {
667705
let data_dir = nodes[0].persister.get_data_dir();
668706
let persister = Arc::new(Persister::new(data_dir));
669707
let event_handler = |_: &_| {};
670-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
708+
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
709+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
671710
loop {
672711
let log_entries = nodes[0].logger.lines.lock().unwrap();
673712
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
@@ -690,7 +729,8 @@ mod tests {
690729
let data_dir = nodes[0].persister.get_data_dir();
691730
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
692731
let event_handler = |_: &_| {};
693-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
732+
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
733+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
694734
match bg_processor.join() {
695735
Ok(_) => panic!("Expected error persisting manager"),
696736
Err(e) => {
@@ -707,7 +747,8 @@ mod tests {
707747
let data_dir = nodes[0].persister.get_data_dir();
708748
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
709749
let event_handler = |_: &_| {};
710-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
750+
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
751+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
711752

712753
match bg_processor.stop() {
713754
Ok(_) => panic!("Expected error persisting network graph"),
@@ -725,7 +766,8 @@ mod tests {
725766
let data_dir = nodes[0].persister.get_data_dir();
726767
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
727768
let event_handler = |_: &_| {};
728-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
769+
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
770+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
729771

730772
match bg_processor.stop() {
731773
Ok(_) => panic!("Expected error persisting scorer"),
@@ -748,7 +790,8 @@ mod tests {
748790
let event_handler = move |event: &Event| {
749791
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
750792
};
751-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
793+
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
794+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
752795

753796
// Open a channel and check that the FundingGenerationReady event was handled.
754797
begin_open_channel!(nodes[0], nodes[1], channel_value);
@@ -773,7 +816,8 @@ mod tests {
773816
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
774817
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
775818
let persister = Arc::new(Persister::new(data_dir));
776-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
819+
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
820+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
777821

778822
// Force close the channel and check that the SpendableOutputs event was handled.
779823
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
@@ -791,6 +835,69 @@ mod tests {
791835
assert!(bg_processor.stop().is_ok());
792836
}
793837

838+
#[test]
839+
fn test_not_pruning_network_graph_until_graph_sync_completion() {
840+
let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
841+
let data_dir = nodes[0].persister.get_data_dir();
842+
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
843+
let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
844+
let network_graph = nodes[0].network_graph.clone();
845+
let rapid_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
846+
let features = ChannelFeatures::empty();
847+
network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
848+
.expect("Failed to update channel from partial announcement");
849+
let original_graph_description = network_graph.to_string();
850+
assert!(original_graph_description.contains("42: features: 0000, node_one:"));
851+
assert_eq!(network_graph.read_only().channels().len(), 1);
852+
853+
let event_handler = |_: &_| {};
854+
let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), Some(rapid_sync.clone()));
855+
856+
loop {
857+
let log_entries = nodes[0].logger.lines.lock().unwrap();
858+
let expected_log_a = "Assessing prunability of network graph".to_string();
859+
let expected_log_b = "Not pruning network graph due to pending gossip sync".to_string();
860+
if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
861+
log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
862+
break
863+
}
864+
}
865+
866+
let initialization_input = vec![
867+
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
868+
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
869+
0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
870+
187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
871+
157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
872+
88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
873+
204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
874+
181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
875+
110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
876+
76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
877+
226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
878+
0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
879+
0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
880+
];
881+
rapid_sync.update_network_graph(&initialization_input[..]).unwrap();
882+
883+
// this should have added two channels
884+
assert_eq!(network_graph.read_only().channels().len(), 3);
885+
886+
let _ = receiver
887+
.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 2))
888+
.expect("Network graph not pruned within deadline");
889+
let current_graph_description = network_graph.to_string();
890+
891+
background_processor.stop().unwrap();
892+
893+
assert_ne!(current_graph_description, original_graph_description);
894+
assert!(!current_graph_description.contains("node_one:"));
895+
assert!(!current_graph_description.contains("node_two:"));
896+
897+
// all channels should now be pruned
898+
assert_eq!(network_graph.read_only().channels().len(), 0);
899+
}
900+
794901
#[test]
795902
fn test_invoice_payer() {
796903
let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
@@ -803,7 +910,8 @@ mod tests {
803910
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
804911
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
805912
let event_handler = Arc::clone(&invoice_payer);
806-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
913+
let rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>> = None;
914+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), rapid_gossip_sync);
807915
assert!(bg_processor.stop().is_ok());
808916
}
809917
}

0 commit comments

Comments
 (0)