Skip to content

Commit f698cf9

Browse files
committed
Create an atomic bool to signal graph sync completion to BackgroundProcessor.
1 parent aac3907 commit f698cf9

File tree

1 file changed

+84
-12
lines changed
  • lightning-background-processor/src

1 file changed

+84
-12
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 84 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use std::ops::Deref;
5757
pub struct BackgroundProcessor {
5858
stop_thread: Arc<AtomicBool>,
5959
thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
60+
is_awaiting_graph_sync_completion: Arc<AtomicBool>
6061
}
6162

6263
#[cfg(not(test))]
@@ -151,6 +152,12 @@ impl BackgroundProcessor {
151152
/// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
152153
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
153154
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
155+
///
156+
/// # Graph Sync
157+
///
158+
/// If a rapid graph sync is meant to run at startup, set `await_graph_sync_completion` to true
159+
/// to indicate to [`BackgroundProcessor`] not to prune the [`NetworkGraph`] instance until
160+
/// [`graph_sync_complete`] is called.
154161
pub fn start<
155162
'a,
156163
Signer: 'static + Sign,
@@ -177,7 +184,7 @@ impl BackgroundProcessor {
177184
SC: WriteableScore<'a>,
178185
>(
179186
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
180-
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S>
187+
net_graph_msg_handler: Option<NG>, peer_manager: PM, await_graph_sync_completion: bool, logger: L, scorer: Option<S>
181188
) -> Self
182189
where
183190
CA::Target: 'static + chain::Access,
@@ -195,6 +202,8 @@ impl BackgroundProcessor {
195202
{
196203
let stop_thread = Arc::new(AtomicBool::new(false));
197204
let stop_thread_clone = stop_thread.clone();
205+
let is_awaiting_graph_sync_completion = Arc::new(AtomicBool::new(await_graph_sync_completion));
206+
let is_awaiting_graph_sync_completion_clone = is_awaiting_graph_sync_completion.clone();
198207
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
199208
let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler: net_graph_msg_handler.as_ref().map(|t| t.deref()) };
200209

@@ -273,6 +282,13 @@ impl BackgroundProcessor {
273282
// continuing our normal cadence.
274283
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
275284
if let Some(ref handler) = net_graph_msg_handler {
285+
log_trace!(logger, "Assessing prunability of network graph");
286+
// The network graph must not be pruned while graph sync completion is pending
287+
let is_currently_awaiting_graph_sync = is_awaiting_graph_sync_completion_clone.load(Ordering::Acquire);
288+
if is_currently_awaiting_graph_sync {
289+
log_trace!(logger, "Not pruning network graph due to pending graph sync");
290+
continue;
291+
}
276292
log_trace!(logger, "Pruning network graph of stale entries");
277293
handler.network_graph().remove_stale_channels();
278294
if let Err(e) = persister.persist_graph(handler.network_graph()) {
@@ -308,7 +324,7 @@ impl BackgroundProcessor {
308324

309325
Ok(())
310326
});
311-
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
327+
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle), is_awaiting_graph_sync_completion }
312328
}
313329

314330
/// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
@@ -339,6 +355,14 @@ impl BackgroundProcessor {
339355
self.stop_and_join_thread()
340356
}
341357

358+
/// Signal to [`BackgroundProcessor`] that the initial rapid graph sync has completed.
359+
///
360+
/// This function can only be called usefully once, so there is an implicit understanding
361+
/// that running graph sync multiple times after startup is API misuse.
362+
pub fn graph_sync_complete(&self) {
363+
self.is_awaiting_graph_sync_completion.store(false, Ordering::Release)
364+
}
365+
342366
fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
343367
self.stop_thread.store(true, Ordering::Release);
344368
self.join_thread()
@@ -370,7 +394,7 @@ mod tests {
370394
use lightning::chain::transaction::OutPoint;
371395
use lightning::get_event_msg;
372396
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
373-
use lightning::ln::features::InitFeatures;
397+
use lightning::ln::features::{ChannelFeatures, InitFeatures};
374398
use lightning::ln::msgs::{ChannelMessageHandler, Init};
375399
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
376400
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
@@ -385,6 +409,7 @@ mod tests {
385409
use std::fs;
386410
use std::path::PathBuf;
387411
use std::sync::{Arc, Mutex};
412+
use std::sync::mpsc::SyncSender;
388413
use std::time::Duration;
389414
use lightning::routing::scoring::{FixedPenaltyScorer};
390415
use super::{BackgroundProcessor, FRESHNESS_TIMER};
@@ -428,6 +453,7 @@ mod tests {
428453

429454
struct Persister {
430455
graph_error: Option<(std::io::ErrorKind, &'static str)>,
456+
graph_persistence_notifier: Option<SyncSender<()>>,
431457
manager_error: Option<(std::io::ErrorKind, &'static str)>,
432458
scorer_error: Option<(std::io::ErrorKind, &'static str)>,
433459
filesystem_persister: FilesystemPersister,
@@ -436,13 +462,17 @@ mod tests {
436462
impl Persister {
437463
fn new(data_dir: String) -> Self {
438464
let filesystem_persister = FilesystemPersister::new(data_dir.clone());
439-
Self { graph_error: None, manager_error: None, scorer_error: None, filesystem_persister }
465+
Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
440466
}
441467

442468
fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
443469
Self { graph_error: Some((error, message)), ..self }
444470
}
445471

472+
fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
473+
Self { graph_persistence_notifier: Some(sender), ..self }
474+
}
475+
446476
fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
447477
Self { manager_error: Some((error, message)), ..self }
448478
}
@@ -461,6 +491,10 @@ mod tests {
461491
}
462492

463493
if key == "network_graph" {
494+
if let Some(sender) = &self.graph_persistence_notifier {
495+
sender.send(()).unwrap();
496+
};
497+
464498
if let Some((error, message)) = self.graph_error {
465499
return Err(std::io::Error::new(error, message))
466500
}
@@ -602,7 +636,7 @@ mod tests {
602636
let data_dir = nodes[0].persister.get_data_dir();
603637
let persister = Arc::new(Persister::new(data_dir));
604638
let event_handler = |_: &_| {};
605-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
639+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
606640

607641
macro_rules! check_persisted_data {
608642
($node: expr, $filepath: expr) => {
@@ -667,7 +701,7 @@ mod tests {
667701
let data_dir = nodes[0].persister.get_data_dir();
668702
let persister = Arc::new(Persister::new(data_dir));
669703
let event_handler = |_: &_| {};
670-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
704+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
671705
loop {
672706
let log_entries = nodes[0].logger.lines.lock().unwrap();
673707
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
@@ -690,7 +724,7 @@ mod tests {
690724
let data_dir = nodes[0].persister.get_data_dir();
691725
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
692726
let event_handler = |_: &_| {};
693-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
727+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
694728
match bg_processor.join() {
695729
Ok(_) => panic!("Expected error persisting manager"),
696730
Err(e) => {
@@ -707,7 +741,7 @@ mod tests {
707741
let data_dir = nodes[0].persister.get_data_dir();
708742
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
709743
let event_handler = |_: &_| {};
710-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
744+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
711745

712746
match bg_processor.stop() {
713747
Ok(_) => panic!("Expected error persisting network graph"),
@@ -725,7 +759,7 @@ mod tests {
725759
let data_dir = nodes[0].persister.get_data_dir();
726760
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
727761
let event_handler = |_: &_| {};
728-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
762+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
729763

730764
match bg_processor.stop() {
731765
Ok(_) => panic!("Expected error persisting scorer"),
@@ -748,7 +782,7 @@ mod tests {
748782
let event_handler = move |event: &Event| {
749783
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
750784
};
751-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
785+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
752786

753787
// Open a channel and check that the FundingGenerationReady event was handled.
754788
begin_open_channel!(nodes[0], nodes[1], channel_value);
@@ -773,7 +807,7 @@ mod tests {
773807
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
774808
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
775809
let persister = Arc::new(Persister::new(data_dir));
776-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
810+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
777811

778812
// Force close the channel and check that the SpendableOutputs event was handled.
779813
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
@@ -791,6 +825,44 @@ mod tests {
791825
assert!(bg_processor.stop().is_ok());
792826
}
793827

828+
#[test]
829+
fn test_not_pruning_network_graph_until_graph_sync_completion() {
830+
let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
831+
let data_dir = nodes[0].persister.get_data_dir();
832+
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
833+
let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
834+
let network_graph = nodes[0].network_graph.clone();
835+
let features = ChannelFeatures::empty();
836+
network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
837+
.expect("Failed to update channel from partial announcement");
838+
let original_graph_description = network_graph.to_string();
839+
assert!(original_graph_description.contains("42: features: 0000, node_one:"));
840+
841+
let event_handler = |_: &_| {};
842+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), true, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
843+
844+
loop {
845+
let log_entries = nodes[0].logger.lines.lock().unwrap();
846+
let expected_log_a = "Assessing prunability of network graph".to_string();
847+
let expected_log_b = "Not pruning network graph due to pending graph sync".to_string();
848+
if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
849+
log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
850+
break
851+
}
852+
}
853+
854+
bg_processor.graph_sync_complete();
855+
856+
let _ = receiver
857+
.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 2))
858+
.expect("Network graph not pruned within deadline");
859+
let current_graph_description = network_graph.to_string();
860+
assert_ne!(current_graph_description, original_graph_description);
861+
assert_eq!(current_graph_description.len(), 31);
862+
assert!(!current_graph_description.contains("node_one:"));
863+
assert!(!current_graph_description.contains("node_two:"));
864+
}
865+
794866
#[test]
795867
fn test_invoice_payer() {
796868
let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
@@ -803,7 +875,7 @@ mod tests {
803875
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
804876
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
805877
let event_handler = Arc::clone(&invoice_payer);
806-
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
878+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), false, nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
807879
assert!(bg_processor.stop().is_ok());
808880
}
809881
}

0 commit comments

Comments
 (0)