Skip to content

Commit e012c4c

Browse files
committed
WIP: Common ChainListener implementations and example
1 parent 6be8ccb commit e012c4c

File tree

7 files changed

+310
-110
lines changed

7 files changed

+310
-110
lines changed

lightning-block-sync/src/init.rs

Lines changed: 106 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
use crate::{BlockSource, BlockSourceResult, Cache, ChainListener, ChainNotifier};
1+
use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier};
22
use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
33

44
use bitcoin::blockdata::block::{Block, BlockHeader};
55
use bitcoin::hash_types::BlockHash;
66
use bitcoin::network::constants::Network;
77

8+
use lightning::chain::ChainListener;
9+
810
/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
911
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
1012
///
@@ -13,7 +15,88 @@ use bitcoin::network::constants::Network;
1315
/// paired with.
1416
///
1517
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
16-
/// switching to [`SpvClient`].
18+
/// switching to [`SpvClient`]. For example:
19+
///
20+
/// ```
21+
/// use bitcoin::hash_types::BlockHash;
22+
/// use bitcoin::network::constants::Network;
23+
///
24+
/// use lightning::chain;
25+
/// use lightning::chain::ChainListener;
26+
/// use lightning::chain::Watch;
27+
/// use lightning::chain::chainmonitor::ChainMonitor;
28+
/// use lightning::chain::channelmonitor;
29+
/// use lightning::chain::channelmonitor::ChannelMonitor;
30+
/// use lightning::chain::chaininterface::BroadcasterInterface;
31+
/// use lightning::chain::chaininterface::FeeEstimator;
32+
/// use lightning::chain::keysinterface;
33+
/// use lightning::chain::keysinterface::KeysInterface;
34+
/// use lightning::ln::channelmanager::ChannelManager;
35+
/// use lightning::ln::channelmanager::ChannelManagerReadArgs;
36+
/// use lightning::util::config::UserConfig;
37+
/// use lightning::util::logger::Logger;
38+
/// use lightning::util::ser::ReadableArgs;
39+
///
40+
/// use lightning_block_sync::*;
41+
///
42+
/// use std::cell::RefCell;
43+
/// use std::io::Cursor;
44+
///
45+
/// async fn init_sync<
46+
/// B: BlockSource,
47+
/// K: KeysInterface<Signer = S>,
48+
/// S: keysinterface::Sign,
49+
/// T: BroadcasterInterface,
50+
/// F: FeeEstimator,
51+
/// L: Logger,
52+
/// C: chain::Filter,
53+
/// P: channelmonitor::Persist<S>,
54+
/// >(
55+
/// block_source: &mut B,
56+
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P>,
57+
/// config: UserConfig,
58+
/// keys_manager: &K,
59+
/// tx_broadcaster: &T,
60+
/// fee_estimator: &F,
61+
/// logger: &L,
62+
/// persister: &P,
63+
/// ) {
64+
/// let serialized_monitor = "...";
65+
/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor<S>)>::read(
66+
/// &mut Cursor::new(&serialized_monitor), keys_manager).unwrap();
67+
///
68+
/// let serialized_manager = "...";
69+
/// let (manager_block_hash, mut manager) = {
70+
/// let read_args = ChannelManagerReadArgs::new(
71+
/// keys_manager,
72+
/// fee_estimator,
73+
/// chain_monitor,
74+
/// tx_broadcaster,
75+
/// logger,
76+
/// config,
77+
/// vec![&mut monitor],
78+
/// );
79+
/// <(BlockHash, ChannelManager<S, &ChainMonitor<S, &C, &T, &F, &L, &P>, &T, &K, &F, &L>)>::read(
80+
/// &mut Cursor::new(&serialized_manager), read_args).unwrap()
81+
/// };
82+
///
83+
/// let mut cache = UnboundedCache::new();
84+
/// let mut monitor_listener = (RefCell::new(monitor), &*tx_broadcaster, &*fee_estimator, &*logger);
85+
/// let listeners = vec![
86+
/// (monitor_block_hash, &mut monitor_listener as &mut dyn ChainListener),
87+
/// (manager_block_hash, &mut manager as &mut dyn ChainListener),
88+
/// ];
89+
/// let chain_tip =
90+
/// init::sync_listeners(block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
91+
///
92+
/// let monitor = monitor_listener.0.into_inner();
93+
/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
94+
///
95+
/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin);
96+
/// let mut chain_listener = (chain_monitor, &manager);
97+
/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener);
98+
/// }
99+
/// ```
17100
///
18101
/// [`SpvClient`]: ../struct.SpvClient.html
19102
/// [`ChannelManager`]: ../../lightning/ln/channelmanager/struct.ChannelManager.html
@@ -49,32 +132,29 @@ pub async fn sync_listeners<B: BlockSource, C: Cache>(
49132
for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) {
50133
// Disconnect any stale blocks, but keep them in the cache for the next iteration.
51134
let header_cache = &mut ReadOnlyCache(header_cache);
52-
let mut chain_notifier = ChainNotifier { header_cache };
53-
let difference =
54-
chain_notifier.find_difference(new_header, &old_header, &mut chain_poller).await?;
55-
chain_notifier.disconnect_blocks(
56-
difference.disconnected_blocks,
57-
&mut DynamicChainListener(chain_listener),
58-
);
135+
let (common_ancestor, connected_blocks) = {
136+
let chain_listener = &DynamicChainListener(chain_listener);
137+
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
138+
let difference =
139+
chain_notifier.find_difference(new_header, &old_header, &mut chain_poller).await?;
140+
chain_notifier.disconnect_blocks(difference.disconnected_blocks);
141+
(difference.common_ancestor, difference.connected_blocks)
142+
};
59143

60144
// Keep track of the most common ancestor and all blocks connected across all listeners.
61-
chain_listeners_at_height.push((difference.common_ancestor.height, chain_listener));
62-
if difference.connected_blocks.len() > most_connected_blocks.len() {
63-
most_common_ancestor = Some(difference.common_ancestor);
64-
most_connected_blocks = difference.connected_blocks;
145+
chain_listeners_at_height.push((common_ancestor.height, chain_listener));
146+
if connected_blocks.len() > most_connected_blocks.len() {
147+
most_common_ancestor = Some(common_ancestor);
148+
most_connected_blocks = connected_blocks;
65149
}
66150
}
67151

68152
// Connect new blocks for all listeners at once to avoid re-fetching blocks.
69153
if let Some(common_ancestor) = most_common_ancestor {
70-
let mut chain_notifier = ChainNotifier { header_cache };
71-
let mut chain_listener = ChainListenerSet(chain_listeners_at_height);
72-
chain_notifier.connect_blocks(
73-
common_ancestor,
74-
most_connected_blocks,
75-
&mut chain_poller,
76-
&mut chain_listener,
77-
).await.or_else(|(e, _)| Err(e))?;
154+
let chain_listener = &ChainListenerSet(chain_listeners_at_height);
155+
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
156+
chain_notifier.connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller)
157+
.await.or_else(|(e, _)| Err(e))?;
78158
}
79159

80160
Ok(new_header)
@@ -104,11 +184,11 @@ impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
104184
struct DynamicChainListener<'a>(&'a mut dyn ChainListener);
105185

106186
impl<'a> ChainListener for DynamicChainListener<'a> {
107-
fn block_connected(&mut self, _block: &Block, _height: u32) {
187+
fn block_connected(&self, _block: &Block, _height: u32) {
108188
unreachable!()
109189
}
110190

111-
fn block_disconnected(&mut self, header: &BlockHeader, height: u32) {
191+
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
112192
self.0.block_disconnected(header, height)
113193
}
114194
}
@@ -117,15 +197,15 @@ impl<'a> ChainListener for DynamicChainListener<'a> {
117197
struct ChainListenerSet<'a>(Vec<(u32, &'a mut dyn ChainListener)>);
118198

119199
impl<'a> ChainListener for ChainListenerSet<'a> {
120-
fn block_connected(&mut self, block: &Block, height: u32) {
121-
for (starting_height, chain_listener) in self.0.iter_mut() {
200+
fn block_connected(&self, block: &Block, height: u32) {
201+
for (starting_height, chain_listener) in self.0.iter() {
122202
if height > *starting_height {
123203
chain_listener.block_connected(block, height);
124204
}
125205
}
126206
}
127207

128-
fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {
208+
fn block_disconnected(&self, _header: &BlockHeader, _height: u32) {
129209
unreachable!()
130210
}
131211
}

0 commit comments

Comments
 (0)