Skip to content

Commit 8c6cb99

Browse files
Merge pull request #1657 from TheBlueMatt/2022-08-async-man-update
Add a background processor which is async
2 parents ee2f1a9 + 2a5bac2 commit 8c6cb99

File tree

5 files changed

+557
-258
lines changed

5 files changed

+557
-258
lines changed

lightning-background-processor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
1717
bitcoin = "0.29.0"
1818
lightning = { version = "0.0.110", path = "../lightning", features = ["std"] }
1919
lightning-rapid-gossip-sync = { version = "0.0.110", path = "../lightning-rapid-gossip-sync" }
20+
futures = { version = "0.3", optional = true }
2021

2122
[dev-dependencies]
2223
lightning = { version = "0.0.110", path = "../lightning", features = ["_test_utils"] }

lightning-background-processor/src/lib.rs

Lines changed: 203 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ use std::thread::JoinHandle;
3434
use std::time::{Duration, Instant};
3535
use std::ops::Deref;
3636

37+
#[cfg(feature = "futures")]
38+
use futures::{select, future::FutureExt};
39+
3740
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
3841
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
3942
/// responsibilities are:
@@ -222,6 +225,203 @@ where A::Target: chain::Access, L::Target: Logger {
222225
}
223226
}
224227

228+
macro_rules! define_run_body {
229+
($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
230+
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
231+
$loop_exit_check: expr, $await: expr)
232+
=> { {
233+
let event_handler = DecoratingEventHandler {
234+
event_handler: $event_handler,
235+
gossip_sync: &$gossip_sync,
236+
};
237+
238+
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
239+
$channel_manager.timer_tick_occurred();
240+
241+
let mut last_freshness_call = Instant::now();
242+
let mut last_ping_call = Instant::now();
243+
let mut last_prune_call = Instant::now();
244+
let mut last_scorer_persist_call = Instant::now();
245+
let mut have_pruned = false;
246+
247+
loop {
248+
$channel_manager.process_pending_events(&event_handler);
249+
$chain_monitor.process_pending_events(&event_handler);
250+
251+
// Note that the PeerManager::process_events may block on ChannelManager's locks,
252+
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
253+
// we want to ensure we get into `persist_manager` as quickly as we can, especially
254+
// without running the normal event processing above and handing events to users.
255+
//
256+
// Specifically, on an *extremely* slow machine, we may see ChannelManager start
257+
// processing a message effectively at any point during this loop. In order to
258+
// minimize the time between such processing completing and persisting the updated
259+
// ChannelManager, we want to minimize methods blocking on a ChannelManager
260+
// generally, and as a fallback place such blocking only immediately before
261+
// persistence.
262+
$peer_manager.process_events();
263+
264+
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
265+
// see `await_start`'s use below.
266+
let await_start = Instant::now();
267+
let updates_available = $await;
268+
let await_time = await_start.elapsed();
269+
270+
if updates_available {
271+
log_trace!($logger, "Persisting ChannelManager...");
272+
$persister.persist_manager(&*$channel_manager)?;
273+
log_trace!($logger, "Done persisting ChannelManager.");
274+
}
275+
// Exit the loop if the background processor was requested to stop.
276+
if $loop_exit_check {
277+
log_trace!($logger, "Terminating background processor.");
278+
break;
279+
}
280+
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
281+
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
282+
$channel_manager.timer_tick_occurred();
283+
last_freshness_call = Instant::now();
284+
}
285+
if await_time > Duration::from_secs(1) {
286+
// On various platforms, we may be starved of CPU cycles for several reasons.
287+
// E.g. on iOS, if we've been in the background, we will be entirely paused.
288+
// Similarly, if we're on a desktop platform and the device has been asleep, we
289+
// may not get any cycles.
290+
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
291+
// full second, at which point we assume sockets may have been killed (they
292+
// appear to be at least on some platforms, even if it has only been a second).
293+
// Note that we have to take care to not get here just because user event
294+
// processing was slow at the top of the loop. For example, the sample client
295+
// may call Bitcoin Core RPCs during event handling, which very often takes
296+
// more than a handful of seconds to complete, and shouldn't disconnect all our
297+
// peers.
298+
log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
299+
$peer_manager.disconnect_all_peers();
300+
last_ping_call = Instant::now();
301+
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
302+
log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
303+
$peer_manager.timer_tick_occurred();
304+
last_ping_call = Instant::now();
305+
}
306+
307+
// Note that we want to run a graph prune once not long after startup before
308+
// falling back to our usual hourly prunes. This avoids short-lived clients never
309+
// pruning their network graph. We run once 60 seconds after startup before
310+
// continuing our normal cadence.
311+
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
312+
// The network graph must not be pruned while rapid sync completion is pending
313+
log_trace!($logger, "Assessing prunability of network graph");
314+
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
315+
network_graph.remove_stale_channels();
316+
317+
if let Err(e) = $persister.persist_graph(network_graph) {
318+
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
319+
}
320+
321+
last_prune_call = Instant::now();
322+
have_pruned = true;
323+
} else {
324+
log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
325+
}
326+
}
327+
328+
if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
329+
if let Some(ref scorer) = $scorer {
330+
log_trace!($logger, "Persisting scorer");
331+
if let Err(e) = $persister.persist_scorer(&scorer) {
332+
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
333+
}
334+
}
335+
last_scorer_persist_call = Instant::now();
336+
}
337+
}
338+
339+
// After we exit, ensure we persist the ChannelManager one final time - this avoids
340+
// some races where users quit while channel updates were in-flight, with
341+
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
342+
$persister.persist_manager(&*$channel_manager)?;
343+
344+
// Persist Scorer on exit
345+
if let Some(ref scorer) = $scorer {
346+
$persister.persist_scorer(&scorer)?;
347+
}
348+
349+
// Persist NetworkGraph on exit
350+
if let Some(network_graph) = $gossip_sync.network_graph() {
351+
$persister.persist_graph(network_graph)?;
352+
}
353+
354+
Ok(())
355+
} }
356+
}
357+
358+
/// Processes background events in a future.
359+
///
360+
/// `sleeper` should return a future which completes in the given amount of time and returns a
361+
/// boolean indicating whether the background processing should continue. Once `sleeper` returns a
362+
/// future which outputs false, the loop will exit and this function's future will complete.
363+
///
364+
/// See [`BackgroundProcessor::start`] for information on which actions this handles.
365+
#[cfg(feature = "futures")]
366+
pub async fn process_events_async<
367+
'a,
368+
Signer: 'static + Sign,
369+
CA: 'static + Deref + Send + Sync,
370+
CF: 'static + Deref + Send + Sync,
371+
CW: 'static + Deref + Send + Sync,
372+
T: 'static + Deref + Send + Sync,
373+
K: 'static + Deref + Send + Sync,
374+
F: 'static + Deref + Send + Sync,
375+
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
376+
L: 'static + Deref + Send + Sync,
377+
P: 'static + Deref + Send + Sync,
378+
Descriptor: 'static + SocketDescriptor + Send + Sync,
379+
CMH: 'static + Deref + Send + Sync,
380+
RMH: 'static + Deref + Send + Sync,
381+
EH: 'static + EventHandler + Send,
382+
PS: 'static + Deref + Send,
383+
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
384+
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
385+
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
386+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
387+
UMH: 'static + Deref + Send + Sync,
388+
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
389+
S: 'static + Deref<Target = SC> + Send + Sync,
390+
SC: WriteableScore<'a>,
391+
SleepFuture: core::future::Future<Output = bool>,
392+
Sleeper: Fn(Duration) -> SleepFuture
393+
>(
394+
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
395+
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
396+
sleeper: Sleeper,
397+
) -> Result<(), std::io::Error>
398+
where
399+
CA::Target: 'static + chain::Access,
400+
CF::Target: 'static + chain::Filter,
401+
CW::Target: 'static + chain::Watch<Signer>,
402+
T::Target: 'static + BroadcasterInterface,
403+
K::Target: 'static + KeysInterface<Signer = Signer>,
404+
F::Target: 'static + FeeEstimator,
405+
L::Target: 'static + Logger,
406+
P::Target: 'static + Persist<Signer>,
407+
CMH::Target: 'static + ChannelMessageHandler,
408+
RMH::Target: 'static + RoutingMessageHandler,
409+
UMH::Target: 'static + CustomMessageHandler,
410+
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
411+
{
412+
let mut should_continue = true;
413+
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
414+
gossip_sync, peer_manager, logger, scorer, should_continue, {
415+
select! {
416+
_ = channel_manager.get_persistable_update_future().fuse() => true,
417+
cont = sleeper(Duration::from_millis(100)).fuse() => {
418+
should_continue = cont;
419+
false
420+
}
421+
}
422+
})
423+
}
424+
225425
impl BackgroundProcessor {
226426
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
227427
/// documentation].
@@ -315,129 +515,9 @@ impl BackgroundProcessor {
315515
let stop_thread = Arc::new(AtomicBool::new(false));
316516
let stop_thread_clone = stop_thread.clone();
317517
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
318-
let event_handler = DecoratingEventHandler {
319-
event_handler,
320-
gossip_sync: &gossip_sync,
321-
};
322-
323-
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
324-
channel_manager.timer_tick_occurred();
325-
326-
let mut last_freshness_call = Instant::now();
327-
let mut last_ping_call = Instant::now();
328-
let mut last_prune_call = Instant::now();
329-
let mut last_scorer_persist_call = Instant::now();
330-
let mut have_pruned = false;
331-
332-
loop {
333-
channel_manager.process_pending_events(&event_handler);
334-
chain_monitor.process_pending_events(&event_handler);
335-
336-
// Note that the PeerManager::process_events may block on ChannelManager's locks,
337-
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
338-
// we want to ensure we get into `persist_manager` as quickly as we can, especially
339-
// without running the normal event processing above and handing events to users.
340-
//
341-
// Specifically, on an *extremely* slow machine, we may see ChannelManager start
342-
// processing a message effectively at any point during this loop. In order to
343-
// minimize the time between such processing completing and persisting the updated
344-
// ChannelManager, we want to minimize methods blocking on a ChannelManager
345-
// generally, and as a fallback place such blocking only immediately before
346-
// persistence.
347-
peer_manager.process_events();
348-
349-
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
350-
// see `await_start`'s use below.
351-
let await_start = Instant::now();
352-
let updates_available =
353-
channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
354-
let await_time = await_start.elapsed();
355-
356-
if updates_available {
357-
log_trace!(logger, "Persisting ChannelManager...");
358-
persister.persist_manager(&*channel_manager)?;
359-
log_trace!(logger, "Done persisting ChannelManager.");
360-
}
361-
// Exit the loop if the background processor was requested to stop.
362-
if stop_thread.load(Ordering::Acquire) == true {
363-
log_trace!(logger, "Terminating background processor.");
364-
break;
365-
}
366-
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
367-
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
368-
channel_manager.timer_tick_occurred();
369-
last_freshness_call = Instant::now();
370-
}
371-
if await_time > Duration::from_secs(1) {
372-
// On various platforms, we may be starved of CPU cycles for several reasons.
373-
// E.g. on iOS, if we've been in the background, we will be entirely paused.
374-
// Similarly, if we're on a desktop platform and the device has been asleep, we
375-
// may not get any cycles.
376-
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
377-
// full second, at which point we assume sockets may have been killed (they
378-
// appear to be at least on some platforms, even if it has only been a second).
379-
// Note that we have to take care to not get here just because user event
380-
// processing was slow at the top of the loop. For example, the sample client
381-
// may call Bitcoin Core RPCs during event handling, which very often takes
382-
// more than a handful of seconds to complete, and shouldn't disconnect all our
383-
// peers.
384-
log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
385-
peer_manager.disconnect_all_peers();
386-
last_ping_call = Instant::now();
387-
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
388-
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
389-
peer_manager.timer_tick_occurred();
390-
last_ping_call = Instant::now();
391-
}
392-
393-
// Note that we want to run a graph prune once not long after startup before
394-
// falling back to our usual hourly prunes. This avoids short-lived clients never
395-
// pruning their network graph. We run once 60 seconds after startup before
396-
// continuing our normal cadence.
397-
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
398-
// The network graph must not be pruned while rapid sync completion is pending
399-
log_trace!(logger, "Assessing prunability of network graph");
400-
if let Some(network_graph) = gossip_sync.prunable_network_graph() {
401-
network_graph.remove_stale_channels();
402-
403-
if let Err(e) = persister.persist_graph(network_graph) {
404-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
405-
}
406-
407-
last_prune_call = Instant::now();
408-
have_pruned = true;
409-
} else {
410-
log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
411-
}
412-
}
413-
414-
if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
415-
if let Some(ref scorer) = scorer {
416-
log_trace!(logger, "Persisting scorer");
417-
if let Err(e) = persister.persist_scorer(&scorer) {
418-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
419-
}
420-
}
421-
last_scorer_persist_call = Instant::now();
422-
}
423-
}
424-
425-
// After we exit, ensure we persist the ChannelManager one final time - this avoids
426-
// some races where users quit while channel updates were in-flight, with
427-
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
428-
persister.persist_manager(&*channel_manager)?;
429-
430-
// Persist Scorer on exit
431-
if let Some(ref scorer) = scorer {
432-
persister.persist_scorer(&scorer)?;
433-
}
434-
435-
// Persist NetworkGraph on exit
436-
if let Some(network_graph) = gossip_sync.network_graph() {
437-
persister.persist_graph(network_graph)?;
438-
}
439-
440-
Ok(())
518+
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
519+
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
520+
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
441521
});
442522
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
443523
}

0 commit comments

Comments
 (0)