Skip to content

Commit d29e2ba

Browse files
committed
Use AChannelManager in BackgroundProcessor
Replace instance of ChannelManager in BackgroundProcessor and in Persister with AChannelManager. This reduces the number of type parameters need in those types, which would need to be repeated in an async version of Persister.
1 parent 82a13b5 commit d29e2ba

File tree

2 files changed

+100
-103
lines changed

2 files changed

+100
-103
lines changed

lightning-background-processor/src/lib.rs

+69-68
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,17 @@ extern crate lightning_rapid_gossip_sync;
2424
use lightning::chain;
2525
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
2626
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
27-
use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
2827
use lightning::events::{Event, PathFailure};
2928
#[cfg(feature = "std")]
3029
use lightning::events::EventHandler;
3130
#[cfg(any(feature = "std", feature = "futures"))]
3231
use lightning::events::EventsProvider;
3332

34-
use lightning::ln::channelmanager::ChannelManager;
33+
use lightning::ln::channelmanager::AChannelManager;
3534
use lightning::ln::msgs::OnionMessageHandler;
3635
use lightning::ln::peer_handler::APeerManager;
3736
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3837
use lightning::routing::utxo::UtxoLookup;
39-
use lightning::routing::router::Router;
4038
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
4139
use lightning::util::logger::Logger;
4240
use lightning::util::persist::Persister;
@@ -81,6 +79,8 @@ use alloc::vec::Vec;
8179
/// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
8280
/// unilateral chain closure fees are at risk.
8381
///
82+
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
83+
/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
8484
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
8585
/// [`Event`]: lightning::events::Event
8686
/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
@@ -286,7 +286,7 @@ macro_rules! define_run_body {
286286
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
287287
) => { {
288288
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
289-
$channel_manager.timer_tick_occurred();
289+
$channel_manager.get_cm().timer_tick_occurred();
290290
log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
291291
$chain_monitor.rebroadcast_pending_claims();
292292

@@ -336,14 +336,14 @@ macro_rules! define_run_body {
336336
break;
337337
}
338338

339-
if $channel_manager.get_and_clear_needs_persistence() {
339+
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
340340
log_trace!($logger, "Persisting ChannelManager...");
341-
$persister.persist_manager(&*$channel_manager)?;
341+
$persister.persist_manager(&$channel_manager)?;
342342
log_trace!($logger, "Done persisting ChannelManager.");
343343
}
344344
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
345345
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
346-
$channel_manager.timer_tick_occurred();
346+
$channel_manager.get_cm().timer_tick_occurred();
347347
last_freshness_call = $get_timer(FRESHNESS_TIMER);
348348
}
349349
if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
@@ -440,7 +440,7 @@ macro_rules! define_run_body {
440440
// After we exit, ensure we persist the ChannelManager one final time - this avoids
441441
// some races where users quit while channel updates were in-flight, with
442442
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
443-
$persister.persist_manager(&*$channel_manager)?;
443+
$persister.persist_manager(&$channel_manager)?;
444444

445445
// Persist Scorer on exit
446446
if let Some(ref scorer) = $scorer {
@@ -539,45 +539,64 @@ use core::task;
539539
/// # use std::sync::atomic::{AtomicBool, Ordering};
540540
/// # use std::time::SystemTime;
541541
/// # use lightning_background_processor::{process_events_async, GossipSync};
542-
/// # struct MyStore {}
543-
/// # impl lightning::util::persist::KVStore for MyStore {
542+
/// # struct Logger {}
543+
/// # impl lightning::util::logger::Logger for Logger {
544+
/// # fn log(&self, _record: lightning::util::logger::Record) {}
545+
/// # }
546+
/// # struct Store {}
547+
/// # impl lightning::util::persist::KVStore for Store {
544548
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
545549
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
546550
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
547551
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
548552
/// # }
549-
/// # struct MyEventHandler {}
550-
/// # impl MyEventHandler {
553+
/// # struct EventHandler {}
554+
/// # impl EventHandler {
551555
/// # async fn handle_event(&self, _: lightning::events::Event) {}
552556
/// # }
553557
/// # #[derive(Eq, PartialEq, Clone, Hash)]
554-
/// # struct MySocketDescriptor {}
555-
/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
558+
/// # struct SocketDescriptor {}
559+
/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
556560
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
557561
/// # fn disconnect_socket(&mut self) {}
558562
/// # }
559-
/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
560-
/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
561-
/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
562-
/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
563-
/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
564-
/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
565-
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
566-
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
567-
/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
568-
/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
569-
/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
570-
/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
571-
///
572-
/// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
573-
/// let background_persister = Arc::clone(&my_persister);
574-
/// let background_event_handler = Arc::clone(&my_event_handler);
575-
/// let background_chain_mon = Arc::clone(&my_chain_monitor);
576-
/// let background_chan_man = Arc::clone(&my_channel_manager);
577-
/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
578-
/// let background_peer_man = Arc::clone(&my_peer_manager);
579-
/// let background_logger = Arc::clone(&my_logger);
580-
/// let background_scorer = Arc::clone(&my_scorer);
563+
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store>>;
564+
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
565+
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
566+
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
567+
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
568+
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
569+
/// #
570+
/// # struct Node<
571+
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
572+
/// # F: lightning::chain::Filter + Send + Sync + 'static,
573+
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
574+
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
575+
/// # > {
576+
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
577+
/// # event_handler: Arc<EventHandler>,
578+
/// # channel_manager: Arc<ChannelManager<B, F, FE>>,
579+
/// # chain_monitor: Arc<ChainMonitor<B, F, FE>>,
580+
/// # gossip_sync: Arc<P2PGossipSync<UL>>,
581+
/// # persister: Arc<Store>,
582+
/// # logger: Arc<Logger>,
583+
/// # scorer: Arc<Scorer>,
584+
/// # }
585+
/// #
586+
/// # async fn setup_background_processing<
587+
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
588+
/// # F: lightning::chain::Filter + Send + Sync + 'static,
589+
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
590+
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
591+
/// # >(node: Node<B, F, FE, UL>) {
592+
/// let background_persister = Arc::clone(&node.persister);
593+
/// let background_event_handler = Arc::clone(&node.event_handler);
594+
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
595+
/// let background_chan_man = Arc::clone(&node.channel_manager);
596+
/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
597+
/// let background_peer_man = Arc::clone(&node.peer_manager);
598+
/// let background_logger = Arc::clone(&node.logger);
599+
/// let background_scorer = Arc::clone(&node.scorer);
581600
///
582601
/// // Setup the sleeper.
583602
/// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
@@ -622,21 +641,16 @@ pub async fn process_events_async<
622641
'a,
623642
UL: 'static + Deref + Send + Sync,
624643
CF: 'static + Deref + Send + Sync,
625-
CW: 'static + Deref + Send + Sync,
626644
T: 'static + Deref + Send + Sync,
627-
ES: 'static + Deref + Send + Sync,
628-
NS: 'static + Deref + Send + Sync,
629-
SP: 'static + Deref + Send + Sync,
630645
F: 'static + Deref + Send + Sync,
631-
R: 'static + Deref + Send + Sync,
632646
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
633647
L: 'static + Deref + Send + Sync,
634648
P: 'static + Deref + Send + Sync,
635649
EventHandlerFuture: core::future::Future<Output = ()>,
636650
EventHandler: Fn(Event) -> EventHandlerFuture,
637651
PS: 'static + Deref + Send,
638-
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
639-
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
652+
M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
653+
CM: 'static + Deref + Send + Sync,
640654
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
641655
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
642656
PM: 'static + Deref + Send + Sync,
@@ -653,16 +667,12 @@ pub async fn process_events_async<
653667
where
654668
UL::Target: 'static + UtxoLookup,
655669
CF::Target: 'static + chain::Filter,
656-
CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
657670
T::Target: 'static + BroadcasterInterface,
658-
ES::Target: 'static + EntropySource,
659-
NS::Target: 'static + NodeSigner,
660-
SP::Target: 'static + SignerProvider,
661671
F::Target: 'static + FeeEstimator,
662-
R::Target: 'static + Router,
663672
L::Target: 'static + Logger,
664-
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
665-
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
673+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
674+
PS::Target: 'static + Persister<'a, CM, L, SC>,
675+
CM::Target: AChannelManager + Send + Sync,
666676
PM::Target: APeerManager + Send + Sync,
667677
{
668678
let mut should_break = false;
@@ -693,11 +703,11 @@ where
693703
define_run_body!(
694704
persister, chain_monitor,
695705
chain_monitor.process_pending_events_async(async_event_handler).await,
696-
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
706+
channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
697707
peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
698708
gossip_sync, logger, scorer, should_break, {
699709
let fut = Selector {
700-
a: channel_manager.get_event_or_persistence_needed_future(),
710+
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
701711
b: chain_monitor.get_update_future(),
702712
c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
703713
};
@@ -788,20 +798,15 @@ impl BackgroundProcessor {
788798
'a,
789799
UL: 'static + Deref + Send + Sync,
790800
CF: 'static + Deref + Send + Sync,
791-
CW: 'static + Deref + Send + Sync,
792801
T: 'static + Deref + Send + Sync,
793-
ES: 'static + Deref + Send + Sync,
794-
NS: 'static + Deref + Send + Sync,
795-
SP: 'static + Deref + Send + Sync,
796802
F: 'static + Deref + Send + Sync,
797-
R: 'static + Deref + Send + Sync,
798803
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
799804
L: 'static + Deref + Send + Sync,
800805
P: 'static + Deref + Send + Sync,
801806
EH: 'static + EventHandler + Send,
802807
PS: 'static + Deref + Send,
803-
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
804-
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
808+
M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
809+
CM: 'static + Deref + Send + Sync,
805810
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
806811
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
807812
PM: 'static + Deref + Send + Sync,
@@ -814,16 +819,12 @@ impl BackgroundProcessor {
814819
where
815820
UL::Target: 'static + UtxoLookup,
816821
CF::Target: 'static + chain::Filter,
817-
CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
818822
T::Target: 'static + BroadcasterInterface,
819-
ES::Target: 'static + EntropySource,
820-
NS::Target: 'static + NodeSigner,
821-
SP::Target: 'static + SignerProvider,
822823
F::Target: 'static + FeeEstimator,
823-
R::Target: 'static + Router,
824824
L::Target: 'static + Logger,
825-
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
826-
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
825+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
826+
PS::Target: 'static + Persister<'a, CM, L, SC>,
827+
CM::Target: AChannelManager + Send + Sync,
827828
PM::Target: APeerManager + Send + Sync,
828829
{
829830
let stop_thread = Arc::new(AtomicBool::new(false));
@@ -849,12 +850,12 @@ impl BackgroundProcessor {
849850
};
850851
define_run_body!(
851852
persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
852-
channel_manager, channel_manager.process_pending_events(&event_handler),
853+
channel_manager, channel_manager.get_cm().process_pending_events(&event_handler),
853854
peer_manager,
854855
peer_manager.onion_message_handler().process_pending_events(&event_handler),
855856
gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
856857
{ Sleeper::from_two_futures(
857-
&channel_manager.get_event_or_persistence_needed_future(),
858+
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
858859
&chain_monitor.get_update_future()
859860
).wait_timeout(Duration::from_millis(100)); },
860861
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,

0 commit comments

Comments
 (0)