Skip to content

Commit d2b3331

Browse files
committed
Add a background processing function that is async.
Adds a method which operates like BackgroundProcessor::start but instead of functioning by spawning a background thread it is async.
1 parent 514e0cf commit d2b3331

File tree

2 files changed

+206
-123
lines changed

2 files changed

+206
-123
lines changed

lightning-background-processor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
1717
bitcoin = "0.28.1"
1818
lightning = { version = "0.0.110", path = "../lightning", features = ["std"] }
1919
lightning-rapid-gossip-sync = { version = "0.0.110", path = "../lightning-rapid-gossip-sync" }
20+
futures = { version = "0.3", optional = true }
2021

2122
[dev-dependencies]
2223
lightning = { version = "0.0.110", path = "../lightning", features = ["_test_utils"] }

lightning-background-processor/src/lib.rs

Lines changed: 205 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ use std::thread::JoinHandle;
3131
use std::time::{Duration, Instant};
3232
use std::ops::Deref;
3333

34+
#[cfg(feature = "futures")]
35+
use futures::select;
36+
#[cfg(feature = "futures")]
37+
use futures::future::FutureExt;
38+
3439
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
3540
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
3641
/// responsibilities are:
@@ -219,6 +224,203 @@ where A::Target: chain::Access, L::Target: Logger {
219224
}
220225
}
221226

227+
macro_rules! define_run_body {
228+
($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
229+
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
230+
$loop_exit_check: expr, $await: expr)
231+
=> { {
232+
let event_handler = DecoratingEventHandler {
233+
event_handler: $event_handler,
234+
gossip_sync: &$gossip_sync,
235+
};
236+
237+
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
238+
$channel_manager.timer_tick_occurred();
239+
240+
let mut last_freshness_call = Instant::now();
241+
let mut last_ping_call = Instant::now();
242+
let mut last_prune_call = Instant::now();
243+
let mut last_scorer_persist_call = Instant::now();
244+
let mut have_pruned = false;
245+
246+
loop {
247+
$channel_manager.process_pending_events(&event_handler);
248+
$chain_monitor.process_pending_events(&event_handler);
249+
250+
// Note that the PeerManager::process_events may block on ChannelManager's locks,
251+
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
252+
// we want to ensure we get into `persist_manager` as quickly as we can, especially
253+
// without running the normal event processing above and handing events to users.
254+
//
255+
// Specifically, on an *extremely* slow machine, we may see ChannelManager start
256+
// processing a message effectively at any point during this loop. In order to
257+
// minimize the time between such processing completing and persisting the updated
258+
// ChannelManager, we want to minimize methods blocking on a ChannelManager
259+
// generally, and as a fallback place such blocking only immediately before
260+
// persistence.
261+
$peer_manager.process_events();
262+
263+
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
264+
// see `await_start`'s use below.
265+
let await_start = Instant::now();
266+
let updates_available = $await;
267+
let await_time = await_start.elapsed();
268+
269+
if updates_available {
270+
log_trace!($logger, "Persisting ChannelManager...");
271+
$persister.persist_manager(&*$channel_manager)?;
272+
log_trace!($logger, "Done persisting ChannelManager.");
273+
}
274+
// Exit the loop if the background processor was requested to stop.
275+
if $loop_exit_check {
276+
log_trace!($logger, "Terminating background processor.");
277+
break;
278+
}
279+
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
280+
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
281+
$channel_manager.timer_tick_occurred();
282+
last_freshness_call = Instant::now();
283+
}
284+
if await_time > Duration::from_secs(1) {
285+
// On various platforms, we may be starved of CPU cycles for several reasons.
286+
// E.g. on iOS, if we've been in the background, we will be entirely paused.
287+
// Similarly, if we're on a desktop platform and the device has been asleep, we
288+
// may not get any cycles.
289+
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
290+
// full second, at which point we assume sockets may have been killed (they
291+
// appear to be at least on some platforms, even if it has only been a second).
292+
// Note that we have to take care to not get here just because user event
293+
// processing was slow at the top of the loop. For example, the sample client
294+
// may call Bitcoin Core RPCs during event handling, which very often takes
295+
// more than a handful of seconds to complete, and shouldn't disconnect all our
296+
// peers.
297+
log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
298+
$peer_manager.disconnect_all_peers();
299+
last_ping_call = Instant::now();
300+
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
301+
log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
302+
$peer_manager.timer_tick_occurred();
303+
last_ping_call = Instant::now();
304+
}
305+
306+
// Note that we want to run a graph prune once not long after startup before
307+
// falling back to our usual hourly prunes. This avoids short-lived clients never
308+
// pruning their network graph. We run once 60 seconds after startup before
309+
// continuing our normal cadence.
310+
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
311+
// The network graph must not be pruned while rapid sync completion is pending
312+
log_trace!($logger, "Assessing prunability of network graph");
313+
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
314+
network_graph.remove_stale_channels();
315+
316+
if let Err(e) = $persister.persist_graph(network_graph) {
317+
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
318+
}
319+
320+
last_prune_call = Instant::now();
321+
have_pruned = true;
322+
} else {
323+
log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
324+
}
325+
}
326+
327+
if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
328+
if let Some(ref scorer) = $scorer {
329+
log_trace!($logger, "Persisting scorer");
330+
if let Err(e) = $persister.persist_scorer(&scorer) {
331+
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
332+
}
333+
}
334+
last_scorer_persist_call = Instant::now();
335+
}
336+
}
337+
338+
// After we exit, ensure we persist the ChannelManager one final time - this avoids
339+
// some races where users quit while channel updates were in-flight, with
340+
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
341+
$persister.persist_manager(&*$channel_manager)?;
342+
343+
// Persist Scorer on exit
344+
if let Some(ref scorer) = $scorer {
345+
$persister.persist_scorer(&scorer)?;
346+
}
347+
348+
// Persist NetworkGraph on exit
349+
if let Some(network_graph) = $gossip_sync.network_graph() {
350+
$persister.persist_graph(network_graph)?;
351+
}
352+
353+
Ok(())
354+
} }
355+
}
356+
357+
/// Processes background events in a future.
358+
///
359+
/// `sleeper` should return a future which completes in the given amount of time and returns a
360+
/// boolean indicating whether the background processing should continue. Once `sleeper` returns a
361+
/// future which outputs false, the loop will exit and this function's future will complete.
362+
///
363+
/// See [`BackgroundProcessor::start`] for information on which actions this handles.
364+
#[cfg(feature = "futures")]
365+
pub async fn process_events_async<
366+
'a,
367+
Signer: 'static + Sign,
368+
CA: 'static + Deref + Send + Sync,
369+
CF: 'static + Deref + Send + Sync,
370+
CW: 'static + Deref + Send + Sync,
371+
T: 'static + Deref + Send + Sync,
372+
K: 'static + Deref + Send + Sync,
373+
F: 'static + Deref + Send + Sync,
374+
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
375+
L: 'static + Deref + Send + Sync,
376+
P: 'static + Deref + Send + Sync,
377+
Descriptor: 'static + SocketDescriptor + Send + Sync,
378+
CMH: 'static + Deref + Send + Sync,
379+
RMH: 'static + Deref + Send + Sync,
380+
EH: 'static + EventHandler + Send,
381+
PS: 'static + Deref + Send,
382+
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
383+
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
384+
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
385+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
386+
UMH: 'static + Deref + Send + Sync,
387+
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
388+
S: 'static + Deref<Target = SC> + Send + Sync,
389+
SC: WriteableScore<'a>,
390+
SleepFuture: core::future::Future<Output = bool>,
391+
Sleeper: Fn(Duration) -> SleepFuture
392+
>(
393+
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
394+
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
395+
sleeper: Sleeper,
396+
) -> Result<(), std::io::Error>
397+
where
398+
CA::Target: 'static + chain::Access,
399+
CF::Target: 'static + chain::Filter,
400+
CW::Target: 'static + chain::Watch<Signer>,
401+
T::Target: 'static + BroadcasterInterface,
402+
K::Target: 'static + KeysInterface<Signer = Signer>,
403+
F::Target: 'static + FeeEstimator,
404+
L::Target: 'static + Logger,
405+
P::Target: 'static + Persist<Signer>,
406+
CMH::Target: 'static + ChannelMessageHandler,
407+
RMH::Target: 'static + RoutingMessageHandler,
408+
UMH::Target: 'static + CustomMessageHandler,
409+
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
410+
{
411+
let mut should_continue = true;
412+
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
413+
gossip_sync, peer_manager, logger, scorer, should_continue, {
414+
select! {
415+
_ = channel_manager.get_persistable_update_future().fuse() => true,
416+
cont = sleeper(Duration::from_millis(100)).fuse() => {
417+
should_continue = cont;
418+
false
419+
}
420+
}
421+
})
422+
}
423+
222424
impl BackgroundProcessor {
223425
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
224426
/// documentation].
@@ -310,129 +512,9 @@ impl BackgroundProcessor {
310512
let stop_thread = Arc::new(AtomicBool::new(false));
311513
let stop_thread_clone = stop_thread.clone();
312514
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
313-
let event_handler = DecoratingEventHandler {
314-
event_handler,
315-
gossip_sync: &gossip_sync,
316-
};
317-
318-
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
319-
channel_manager.timer_tick_occurred();
320-
321-
let mut last_freshness_call = Instant::now();
322-
let mut last_ping_call = Instant::now();
323-
let mut last_prune_call = Instant::now();
324-
let mut last_scorer_persist_call = Instant::now();
325-
let mut have_pruned = false;
326-
327-
loop {
328-
channel_manager.process_pending_events(&event_handler);
329-
chain_monitor.process_pending_events(&event_handler);
330-
331-
// Note that the PeerManager::process_events may block on ChannelManager's locks,
332-
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
333-
// we want to ensure we get into `persist_manager` as quickly as we can, especially
334-
// without running the normal event processing above and handing events to users.
335-
//
336-
// Specifically, on an *extremely* slow machine, we may see ChannelManager start
337-
// processing a message effectively at any point during this loop. In order to
338-
// minimize the time between such processing completing and persisting the updated
339-
// ChannelManager, we want to minimize methods blocking on a ChannelManager
340-
// generally, and as a fallback place such blocking only immediately before
341-
// persistence.
342-
peer_manager.process_events();
343-
344-
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
345-
// see `await_start`'s use below.
346-
let await_start = Instant::now();
347-
let updates_available =
348-
channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
349-
let await_time = await_start.elapsed();
350-
351-
if updates_available {
352-
log_trace!(logger, "Persisting ChannelManager...");
353-
persister.persist_manager(&*channel_manager)?;
354-
log_trace!(logger, "Done persisting ChannelManager.");
355-
}
356-
// Exit the loop if the background processor was requested to stop.
357-
if stop_thread.load(Ordering::Acquire) == true {
358-
log_trace!(logger, "Terminating background processor.");
359-
break;
360-
}
361-
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
362-
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
363-
channel_manager.timer_tick_occurred();
364-
last_freshness_call = Instant::now();
365-
}
366-
if await_time > Duration::from_secs(1) {
367-
// On various platforms, we may be starved of CPU cycles for several reasons.
368-
// E.g. on iOS, if we've been in the background, we will be entirely paused.
369-
// Similarly, if we're on a desktop platform and the device has been asleep, we
370-
// may not get any cycles.
371-
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
372-
// full second, at which point we assume sockets may have been killed (they
373-
// appear to be at least on some platforms, even if it has only been a second).
374-
// Note that we have to take care to not get here just because user event
375-
// processing was slow at the top of the loop. For example, the sample client
376-
// may call Bitcoin Core RPCs during event handling, which very often takes
377-
// more than a handful of seconds to complete, and shouldn't disconnect all our
378-
// peers.
379-
log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
380-
peer_manager.disconnect_all_peers();
381-
last_ping_call = Instant::now();
382-
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
383-
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
384-
peer_manager.timer_tick_occurred();
385-
last_ping_call = Instant::now();
386-
}
387-
388-
// Note that we want to run a graph prune once not long after startup before
389-
// falling back to our usual hourly prunes. This avoids short-lived clients never
390-
// pruning their network graph. We run once 60 seconds after startup before
391-
// continuing our normal cadence.
392-
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
393-
// The network graph must not be pruned while rapid sync completion is pending
394-
log_trace!(logger, "Assessing prunability of network graph");
395-
if let Some(network_graph) = gossip_sync.prunable_network_graph() {
396-
network_graph.remove_stale_channels();
397-
398-
if let Err(e) = persister.persist_graph(network_graph) {
399-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
400-
}
401-
402-
last_prune_call = Instant::now();
403-
have_pruned = true;
404-
} else {
405-
log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
406-
}
407-
}
408-
409-
if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
410-
if let Some(ref scorer) = scorer {
411-
log_trace!(logger, "Persisting scorer");
412-
if let Err(e) = persister.persist_scorer(&scorer) {
413-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
414-
}
415-
}
416-
last_scorer_persist_call = Instant::now();
417-
}
418-
}
419-
420-
// After we exit, ensure we persist the ChannelManager one final time - this avoids
421-
// some races where users quit while channel updates were in-flight, with
422-
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
423-
persister.persist_manager(&*channel_manager)?;
424-
425-
// Persist Scorer on exit
426-
if let Some(ref scorer) = scorer {
427-
persister.persist_scorer(&scorer)?;
428-
}
429-
430-
// Persist NetworkGraph on exit
431-
if let Some(network_graph) = gossip_sync.network_graph() {
432-
persister.persist_graph(network_graph)?;
433-
}
434-
435-
Ok(())
515+
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
516+
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
517+
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
436518
});
437519
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
438520
}

0 commit comments

Comments
 (0)