@@ -468,7 +468,8 @@ use core::task;
468
468
///
469
469
/// `sleeper` should return a future which completes in the given amount of time and returns a
470
470
/// boolean indicating whether the background processing should exit. Once `sleeper` returns a
471
- /// future which outputs true, the loop will exit and this function's future will complete.
471
+ /// future which outputs `true`, the loop will exit and this function's future will complete.
472
+ /// The `sleeper` future is free to return early after it has triggered the exit condition.
472
473
///
473
474
/// See [`BackgroundProcessor::start`] for information on which actions this handles.
474
475
///
@@ -481,6 +482,87 @@ use core::task;
481
482
/// mobile device, where we may need to check for interruption of the application regularly. If you
482
483
/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
483
484
/// are hundreds or thousands of simultaneous process calls running.
485
+ ///
486
+ /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
487
+ /// could setup `process_events_async` like this:
488
+ /// ```
489
+ /// # struct MyPersister {}
490
+ /// # impl lightning::util::persist::KVStorePersister for MyPersister {
491
+ /// # fn persist<W: lightning::util::ser::Writeable>(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) }
492
+ /// # }
493
+ /// # struct MyEventHandler {}
494
+ /// # impl MyEventHandler {
495
+ /// # async fn handle_event(&self, _: lightning::events::Event) {}
496
+ /// # }
497
+ /// # #[derive(Eq, PartialEq, Clone, Hash)]
498
+ /// # struct MySocketDescriptor {}
499
+ /// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
500
+ /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
501
+ /// # fn disconnect_socket(&mut self) {}
502
+ /// # }
503
+ /// # use std::sync::{Arc, Mutex};
504
+ /// # use std::sync::atomic::{AtomicBool, Ordering};
505
+ /// # use lightning_background_processor::{process_events_async, GossipSync};
506
+ /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
507
+ /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
508
+ /// # type MyNodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync;
509
+ /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
510
+ /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
511
+ /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
512
+ /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyPersister>>;
513
+ /// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyUtxoLookup, MyLogger>;
514
+ /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
515
+ /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
516
+ /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
517
+ /// # type MyScorer = Mutex<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
518
+ ///
519
+ /// # async fn setup_background_processing(my_persister: Arc<MyPersister>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
520
+ /// let background_persister = Arc::clone(&my_persister);
521
+ /// let background_event_handler = Arc::clone(&my_event_handler);
522
+ /// let background_chain_mon = Arc::clone(&my_chain_monitor);
523
+ /// let background_chan_man = Arc::clone(&my_channel_manager);
524
+ /// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
525
+ /// let background_peer_man = Arc::clone(&my_peer_manager);
526
+ /// let background_logger = Arc::clone(&my_logger);
527
+ /// let background_scorer = Arc::clone(&my_scorer);
528
+ ///
529
+ /// // Setup the sleeper.
530
+ /// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
531
+ ///
532
+ /// let sleeper = move |d| {
533
+ /// let mut receiver = stop_receiver.clone();
534
+ /// Box::pin(async move {
535
+ /// tokio::select!{
536
+ /// _ = tokio::time::sleep(d) => false,
537
+ /// _ = receiver.changed() => true,
538
+ /// }
539
+ /// })
540
+ /// };
541
+ ///
542
+ /// let mobile_interruptable_platform = false;
543
+ ///
544
+ /// let handle = tokio::spawn(async move {
545
+ /// process_events_async(
546
+ /// background_persister,
547
+ /// |e| background_event_handler.handle_event(e),
548
+ /// background_chain_mon,
549
+ /// background_chan_man,
550
+ /// background_gossip_sync,
551
+ /// background_peer_man,
552
+ /// background_logger,
553
+ /// Some(background_scorer),
554
+ /// sleeper,
555
+ /// mobile_interruptable_platform,
556
+ /// )
557
+ /// .await
558
+ /// .expect("Failed to process events");
559
+ /// });
560
+ ///
561
+ /// // Stop the background processing.
562
+ /// stop_sender.send(()).unwrap();
563
+ /// handle.await.unwrap();
564
+ /// # }
565
+ ///```
484
566
#[ cfg( feature = "futures" ) ]
485
567
pub async fn process_events_async <
486
568
' a ,
0 commit comments