Skip to content

Commit f8580fd

Browse files
committed
Use optional rapid sync wrapper in background processor.
1 parent 8df6fbb commit f8580fd

File tree

3 files changed

+53
-26
lines changed

3 files changed

+53
-26
lines changed

lightning-background-processor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ lightning = { version = "0.0.106", path = "../lightning", features = ["std"] }
2121
lightning = { version = "0.0.106", path = "../lightning", features = ["_test_utils"] }
2222
lightning-invoice = { version = "0.14.0", path = "../lightning-invoice" }
2323
lightning-persister = { version = "0.0.106", path = "../lightning-persister" }
24+
lightning-rapid-gossip-sync = { version = "0.0.106", path = "../lightning-rapid-gossip-sync" }

lightning-background-processor/src/lib.rs

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
1010

1111
#[macro_use] extern crate lightning;
12+
extern crate lightning_rapid_gossip_sync;
1213

1314
use lightning::chain;
1415
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
@@ -28,6 +29,7 @@ use std::thread;
2829
use std::thread::JoinHandle;
2930
use std::time::{Duration, Instant};
3031
use std::ops::Deref;
32+
use lightning_rapid_gossip_sync::RapidGossipSync;
3133

3234
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
3335
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@@ -56,8 +58,7 @@ use std::ops::Deref;
5658
#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
5759
pub struct BackgroundProcessor {
5860
stop_thread: Arc<AtomicBool>,
59-
thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
60-
is_awaiting_rapid_sync_completion: Arc<AtomicBool>
61+
thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>
6162
}
6263

6364
#[cfg(not(test))]
@@ -184,7 +185,8 @@ impl BackgroundProcessor {
184185
SC: WriteableScore<'a>,
185186
>(
186187
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
187-
net_graph_msg_handler: Option<NG>, peer_manager: PM, await_rapid_gossip_sync_completion: bool, logger: L, scorer: Option<S>
188+
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S>,
189+
rapid_gossip_sync: Option<Arc<RapidGossipSync<G>>>
188190
) -> Self
189191
where
190192
CA::Target: 'static + chain::Access,
@@ -202,8 +204,11 @@ impl BackgroundProcessor {
202204
{
203205
let stop_thread = Arc::new(AtomicBool::new(false));
204206
let stop_thread_clone = stop_thread.clone();
205-
let is_awaiting_graph_sync_completion = Arc::new(AtomicBool::new(await_rapid_gossip_sync_completion));
206-
let is_awaiting_graph_sync_completion_clone = is_awaiting_graph_sync_completion.clone();
207+
let is_initial_sync_complete = if let Some(rapid_sync) = rapid_gossip_sync {
208+
rapid_sync.is_initial_sync_complete.clone()
209+
} else {
210+
Arc::new(AtomicBool::new(true))
211+
};
207212
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
208213
let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler: net_graph_msg_handler.as_ref().map(|t| t.deref()) };
209214

@@ -284,7 +289,7 @@ impl BackgroundProcessor {
284289
if let Some(ref handler) = net_graph_msg_handler {
285290
log_trace!(logger, "Assessing prunability of network graph");
286291
// The network graph must not be pruned while rapid sync completion is pending
287-
let is_currently_awaiting_graph_sync = is_awaiting_graph_sync_completion_clone.load(Ordering::Acquire);
292+
let is_currently_awaiting_graph_sync = !is_initial_sync_complete.load(Ordering::Acquire);
288293
if is_currently_awaiting_graph_sync {
289294
log_trace!(logger, "Not pruning network graph due to pending gossip sync");
290295
continue;
@@ -324,7 +329,7 @@ impl BackgroundProcessor {
324329

325330
Ok(())
326331
});
327-
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle), is_awaiting_rapid_sync_completion: is_awaiting_graph_sync_completion }
332+
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
328333
}
329334

330335
/// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
@@ -355,14 +360,6 @@ impl BackgroundProcessor {
355360
self.stop_and_join_thread()
356361
}
357362

358-
/// Signal to [`BackgroundProcessor`] that the initial rapid gossip sync has completed.
359-
///
360-
/// This function can only be called usefully once, so there is an implicit understanding
361-
/// that running rapid sync multiple times after startup is API misuse.
362-
pub fn rapid_gossip_sync_complete(&self) {
363-
self.is_awaiting_rapid_sync_completion.store(false, Ordering::Release)
364-
}
365-
366363
fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
367364
self.stop_thread.store(true, Ordering::Release);
368365
self.join_thread()
@@ -412,6 +409,7 @@ mod tests {
412409
use std::sync::mpsc::SyncSender;
413410
use std::time::Duration;
414411
use lightning::routing::scoring::{FixedPenaltyScorer};
412+
use lightning_rapid_gossip_sync::RapidGossipSync;
415413
use super::{BackgroundProcessor, FRESHNESS_TIMER};
416414

417415
const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
@@ -636,7 +634,7 @@ mod tests {
636634
let data_dir = nodes[0].persister.get_data_dir();
637635
let persister = Arc::new(Persister::new(data_dir));
638636
let event_handler = |_: &_| {};
639-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
637+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), None);
640638

641639
macro_rules! check_persisted_data {
642640
($node: expr, $filepath: expr) => {
@@ -701,7 +699,7 @@ mod tests {
701699
let data_dir = nodes[0].persister.get_data_dir();
702700
let persister = Arc::new(Persister::new(data_dir));
703701
let event_handler = |_: &_| {};
704-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
702+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), None);
705703
loop {
706704
let log_entries = nodes[0].logger.lines.lock().unwrap();
707705
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
@@ -724,7 +722,7 @@ mod tests {
724722
let data_dir = nodes[0].persister.get_data_dir();
725723
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
726724
let event_handler = |_: &_| {};
727-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
725+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), None);
728726
match bg_processor.join() {
729727
Ok(_) => panic!("Expected error persisting manager"),
730728
Err(e) => {
@@ -741,7 +739,7 @@ mod tests {
741739
let data_dir = nodes[0].persister.get_data_dir();
742740
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
743741
let event_handler = |_: &_| {};
744-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
742+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), None);
745743

746744
match bg_processor.stop() {
747745
Ok(_) => panic!("Expected error persisting network graph"),
@@ -759,7 +757,7 @@ mod tests {
759757
let data_dir = nodes[0].persister.get_data_dir();
760758
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
761759
let event_handler = |_: &_| {};
762-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
760+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), None);
763761

764762
match bg_processor.stop() {
765763
Ok(_) => panic!("Expected error persisting scorer"),
@@ -782,7 +780,7 @@ mod tests {
782780
let event_handler = move |event: &Event| {
783781
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
784782
};
785-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
783+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), None);
786784

787785
// Open a channel and check that the FundingGenerationReady event was handled.
788786
begin_open_channel!(nodes[0], nodes[1], channel_value);
@@ -807,7 +805,7 @@ mod tests {
807805
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
808806
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
809807
let persister = Arc::new(Persister::new(data_dir));
810-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
808+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), None);
811809

812810
// Force close the channel and check that the SpendableOutputs event was handled.
813811
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
@@ -832,14 +830,16 @@ mod tests {
832830
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
833831
let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
834832
let network_graph = nodes[0].network_graph.clone();
833+
let rapid_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
835834
let features = ChannelFeatures::empty();
836835
network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
837836
.expect("Failed to update channel from partial announcement");
838837
let original_graph_description = network_graph.to_string();
839838
assert!(original_graph_description.contains("42: features: 0000, node_one:"));
839+
assert_eq!(network_graph.read_only().channels().len(), 1);
840840

841841
let event_handler = |_: &_| {};
842-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), true, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
842+
let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), Some(rapid_sync.clone()));
843843

844844
loop {
845845
let log_entries = nodes[0].logger.lines.lock().unwrap();
@@ -851,16 +851,39 @@ mod tests {
851851
}
852852
}
853853

854-
bg_processor.rapid_gossip_sync_complete();
854+
let initialization_input = vec![
855+
76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
856+
79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
857+
0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
858+
187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
859+
157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
860+
88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
861+
204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
862+
181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
863+
110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
864+
76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
865+
226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
866+
0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
867+
0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
868+
];
869+
rapid_sync.update_network_graph(&initialization_input[..]).unwrap();
870+
871+
// this should have added two channels
872+
assert_eq!(network_graph.read_only().channels().len(), 3);
855873

856874
let _ = receiver
857875
.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 2))
858876
.expect("Network graph not pruned within deadline");
859877
let current_graph_description = network_graph.to_string();
878+
879+
background_processor.stop().unwrap();
880+
860881
assert_ne!(current_graph_description, original_graph_description);
861-
assert_eq!(current_graph_description.len(), 31);
862882
assert!(!current_graph_description.contains("node_one:"));
863883
assert!(!current_graph_description.contains("node_two:"));
884+
885+
// all channels should now be pruned
886+
assert_eq!(network_graph.read_only().channels().len(), 0);
864887
}
865888

866889
#[test]
@@ -875,7 +898,7 @@ mod tests {
875898
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
876899
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
877900
let event_handler = Arc::clone(&invoice_payer);
878-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
901+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), None);
879902
assert!(bg_processor.stop().is_ok());
880903
}
881904
}

lightning-rapid-gossip-sync/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,16 @@ pub mod error;
7272
pub mod processing;
7373

7474
/// Rapid Gossip Sync holder struct
75+
/// todo: ask reviewers for more interesting things to say here
7576
pub struct RapidGossipSync<NG: Deref<Target=NetworkGraph>> {
7677
network_graph: NG,
7778
/// Atomic bool indicating whether a rapid gossip sync has completed at least once
7879
pub is_initial_sync_complete: Arc<AtomicBool>
7980
}
8081

8182
impl<NG: Deref<Target=NetworkGraph>> RapidGossipSync<NG> {
83+
/// Instantiate a new RapidGossipSync holder
84+
/// todo: same as above
8285
pub fn new(network_graph: NG) -> Self {
8386
Self {
8487
network_graph,

0 commit comments

Comments
 (0)