Skip to content

Commit b8b1ef3

Browse files
authored
Merge pull request #2963 from jkczyz/2024-03-a-channel-manager
Use `AChannelManager` in `BackgroundProcessor`
2 parents 6d11111 + d29e2ba commit b8b1ef3

File tree

2 files changed

+103
-112
lines changed

2 files changed

+103
-112
lines changed

lightning-background-processor/src/lib.rs

+72-71
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,17 @@ extern crate lightning_rapid_gossip_sync;
2424
use lightning::chain;
2525
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
2626
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
27-
use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
2827
use lightning::events::{Event, PathFailure};
2928
#[cfg(feature = "std")]
3029
use lightning::events::EventHandler;
3130
#[cfg(any(feature = "std", feature = "futures"))]
3231
use lightning::events::EventsProvider;
3332

34-
use lightning::ln::channelmanager::ChannelManager;
33+
use lightning::ln::channelmanager::AChannelManager;
3534
use lightning::ln::msgs::OnionMessageHandler;
3635
use lightning::ln::peer_handler::APeerManager;
3736
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3837
use lightning::routing::utxo::UtxoLookup;
39-
use lightning::routing::router::Router;
4038
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
4139
use lightning::util::logger::Logger;
4240
use lightning::util::persist::Persister;
@@ -81,6 +79,8 @@ use alloc::vec::Vec;
8179
/// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
8280
/// unilateral chain closure fees are at risk.
8381
///
82+
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
83+
/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
8484
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
8585
/// [`Event`]: lightning::events::Event
8686
/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
@@ -286,7 +286,7 @@ macro_rules! define_run_body {
286286
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
287287
) => { {
288288
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
289-
$channel_manager.timer_tick_occurred();
289+
$channel_manager.get_cm().timer_tick_occurred();
290290
log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
291291
$chain_monitor.rebroadcast_pending_claims();
292292

@@ -336,14 +336,14 @@ macro_rules! define_run_body {
336336
break;
337337
}
338338

339-
if $channel_manager.get_and_clear_needs_persistence() {
339+
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
340340
log_trace!($logger, "Persisting ChannelManager...");
341-
$persister.persist_manager(&*$channel_manager)?;
341+
$persister.persist_manager(&$channel_manager)?;
342342
log_trace!($logger, "Done persisting ChannelManager.");
343343
}
344344
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
345345
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
346-
$channel_manager.timer_tick_occurred();
346+
$channel_manager.get_cm().timer_tick_occurred();
347347
last_freshness_call = $get_timer(FRESHNESS_TIMER);
348348
}
349349
if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
@@ -440,7 +440,7 @@ macro_rules! define_run_body {
440440
// After we exit, ensure we persist the ChannelManager one final time - this avoids
441441
// some races where users quit while channel updates were in-flight, with
442442
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
443-
$persister.persist_manager(&*$channel_manager)?;
443+
$persister.persist_manager(&$channel_manager)?;
444444

445445
// Persist Scorer on exit
446446
if let Some(ref scorer) = $scorer {
@@ -539,45 +539,64 @@ use core::task;
539539
/// # use std::sync::atomic::{AtomicBool, Ordering};
540540
/// # use std::time::SystemTime;
541541
/// # use lightning_background_processor::{process_events_async, GossipSync};
542-
/// # struct MyStore {}
543-
/// # impl lightning::util::persist::KVStore for MyStore {
542+
/// # struct Logger {}
543+
/// # impl lightning::util::logger::Logger for Logger {
544+
/// # fn log(&self, _record: lightning::util::logger::Record) {}
545+
/// # }
546+
/// # struct Store {}
547+
/// # impl lightning::util::persist::KVStore for Store {
544548
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
545549
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
546550
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
547551
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
548552
/// # }
549-
/// # struct MyEventHandler {}
550-
/// # impl MyEventHandler {
553+
/// # struct EventHandler {}
554+
/// # impl EventHandler {
551555
/// # async fn handle_event(&self, _: lightning::events::Event) {}
552556
/// # }
553557
/// # #[derive(Eq, PartialEq, Clone, Hash)]
554-
/// # struct MySocketDescriptor {}
555-
/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
558+
/// # struct SocketDescriptor {}
559+
/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
556560
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
557561
/// # fn disconnect_socket(&mut self) {}
558562
/// # }
559-
/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
560-
/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
561-
/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
562-
/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
563-
/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
564-
/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
565-
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
566-
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
567-
/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
568-
/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
569-
/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
570-
/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
571-
///
572-
/// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
573-
/// let background_persister = Arc::clone(&my_persister);
574-
/// let background_event_handler = Arc::clone(&my_event_handler);
575-
/// let background_chain_mon = Arc::clone(&my_chain_monitor);
576-
/// let background_chan_man = Arc::clone(&my_channel_manager);
577-
/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
578-
/// let background_peer_man = Arc::clone(&my_peer_manager);
579-
/// let background_logger = Arc::clone(&my_logger);
580-
/// let background_scorer = Arc::clone(&my_scorer);
563+
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store>>;
564+
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
565+
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
566+
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
567+
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
568+
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
569+
/// #
570+
/// # struct Node<
571+
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
572+
/// # F: lightning::chain::Filter + Send + Sync + 'static,
573+
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
574+
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
575+
/// # > {
576+
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
577+
/// # event_handler: Arc<EventHandler>,
578+
/// # channel_manager: Arc<ChannelManager<B, F, FE>>,
579+
/// # chain_monitor: Arc<ChainMonitor<B, F, FE>>,
580+
/// # gossip_sync: Arc<P2PGossipSync<UL>>,
581+
/// # persister: Arc<Store>,
582+
/// # logger: Arc<Logger>,
583+
/// # scorer: Arc<Scorer>,
584+
/// # }
585+
/// #
586+
/// # async fn setup_background_processing<
587+
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
588+
/// # F: lightning::chain::Filter + Send + Sync + 'static,
589+
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
590+
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
591+
/// # >(node: Node<B, F, FE, UL>) {
592+
/// let background_persister = Arc::clone(&node.persister);
593+
/// let background_event_handler = Arc::clone(&node.event_handler);
594+
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
595+
/// let background_chan_man = Arc::clone(&node.channel_manager);
596+
/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
597+
/// let background_peer_man = Arc::clone(&node.peer_manager);
598+
/// let background_logger = Arc::clone(&node.logger);
599+
/// let background_scorer = Arc::clone(&node.scorer);
581600
///
582601
/// // Setup the sleeper.
583602
/// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
@@ -607,9 +626,9 @@ use core::task;
607626
/// sleeper,
608627
/// mobile_interruptable_platform,
609628
/// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
610-
/// )
611-
/// .await
612-
/// .expect("Failed to process events");
629+
/// )
630+
/// .await
631+
/// .expect("Failed to process events");
613632
/// });
614633
///
615634
/// // Stop the background processing.
@@ -622,21 +641,16 @@ pub async fn process_events_async<
622641
'a,
623642
UL: 'static + Deref + Send + Sync,
624643
CF: 'static + Deref + Send + Sync,
625-
CW: 'static + Deref + Send + Sync,
626644
T: 'static + Deref + Send + Sync,
627-
ES: 'static + Deref + Send + Sync,
628-
NS: 'static + Deref + Send + Sync,
629-
SP: 'static + Deref + Send + Sync,
630645
F: 'static + Deref + Send + Sync,
631-
R: 'static + Deref + Send + Sync,
632646
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
633647
L: 'static + Deref + Send + Sync,
634648
P: 'static + Deref + Send + Sync,
635649
EventHandlerFuture: core::future::Future<Output = ()>,
636650
EventHandler: Fn(Event) -> EventHandlerFuture,
637651
PS: 'static + Deref + Send,
638-
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
639-
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
652+
M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
653+
CM: 'static + Deref + Send + Sync,
640654
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
641655
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
642656
PM: 'static + Deref + Send + Sync,
@@ -653,16 +667,12 @@ pub async fn process_events_async<
653667
where
654668
UL::Target: 'static + UtxoLookup,
655669
CF::Target: 'static + chain::Filter,
656-
CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
657670
T::Target: 'static + BroadcasterInterface,
658-
ES::Target: 'static + EntropySource,
659-
NS::Target: 'static + NodeSigner,
660-
SP::Target: 'static + SignerProvider,
661671
F::Target: 'static + FeeEstimator,
662-
R::Target: 'static + Router,
663672
L::Target: 'static + Logger,
664-
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
665-
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
673+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
674+
PS::Target: 'static + Persister<'a, CM, L, SC>,
675+
CM::Target: AChannelManager + Send + Sync,
666676
PM::Target: APeerManager + Send + Sync,
667677
{
668678
let mut should_break = false;
@@ -693,11 +703,11 @@ where
693703
define_run_body!(
694704
persister, chain_monitor,
695705
chain_monitor.process_pending_events_async(async_event_handler).await,
696-
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
706+
channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
697707
peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
698708
gossip_sync, logger, scorer, should_break, {
699709
let fut = Selector {
700-
a: channel_manager.get_event_or_persistence_needed_future(),
710+
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
701711
b: chain_monitor.get_update_future(),
702712
c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
703713
};
@@ -788,20 +798,15 @@ impl BackgroundProcessor {
788798
'a,
789799
UL: 'static + Deref + Send + Sync,
790800
CF: 'static + Deref + Send + Sync,
791-
CW: 'static + Deref + Send + Sync,
792801
T: 'static + Deref + Send + Sync,
793-
ES: 'static + Deref + Send + Sync,
794-
NS: 'static + Deref + Send + Sync,
795-
SP: 'static + Deref + Send + Sync,
796802
F: 'static + Deref + Send + Sync,
797-
R: 'static + Deref + Send + Sync,
798803
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
799804
L: 'static + Deref + Send + Sync,
800805
P: 'static + Deref + Send + Sync,
801806
EH: 'static + EventHandler + Send,
802807
PS: 'static + Deref + Send,
803-
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
804-
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
808+
M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
809+
CM: 'static + Deref + Send + Sync,
805810
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
806811
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
807812
PM: 'static + Deref + Send + Sync,
@@ -814,16 +819,12 @@ impl BackgroundProcessor {
814819
where
815820
UL::Target: 'static + UtxoLookup,
816821
CF::Target: 'static + chain::Filter,
817-
CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
818822
T::Target: 'static + BroadcasterInterface,
819-
ES::Target: 'static + EntropySource,
820-
NS::Target: 'static + NodeSigner,
821-
SP::Target: 'static + SignerProvider,
822823
F::Target: 'static + FeeEstimator,
823-
R::Target: 'static + Router,
824824
L::Target: 'static + Logger,
825-
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
826-
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
825+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
826+
PS::Target: 'static + Persister<'a, CM, L, SC>,
827+
CM::Target: AChannelManager + Send + Sync,
827828
PM::Target: APeerManager + Send + Sync,
828829
{
829830
let stop_thread = Arc::new(AtomicBool::new(false));
@@ -849,12 +850,12 @@ impl BackgroundProcessor {
849850
};
850851
define_run_body!(
851852
persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
852-
channel_manager, channel_manager.process_pending_events(&event_handler),
853+
channel_manager, channel_manager.get_cm().process_pending_events(&event_handler),
853854
peer_manager,
854855
peer_manager.onion_message_handler().process_pending_events(&event_handler),
855856
gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
856857
{ Sleeper::from_two_futures(
857-
&channel_manager.get_event_or_persistence_needed_future(),
858+
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
858859
&chain_monitor.get_update_future()
859860
).wait_timeout(Duration::from_millis(100)); },
860861
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,

0 commit comments

Comments
 (0)