Skip to content

Commit f372c16

Browse files
committed
Utility for syncing a set of chain listeners
Add a utility for syncing a set of chain listeners to a common chain tip. Required to use before creating an SpvClient when the chain listener used with the client is actually a set of listeners each of which may have had left off at a different block. This would occur when the listeners had been persisted individually at different frequencies (e.g., a ChainMonitor's individual ChannelMonitors).
1 parent 53ebfb9 commit f372c16

File tree

2 files changed

+350
-55
lines changed

2 files changed

+350
-55
lines changed

lightning-block-sync/src/init.rs

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
use crate::{BlockSource, BlockSourceResult, Cache, ChainListener, ChainNotifier};
2+
use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
3+
4+
use bitcoin::blockdata::block::{Block, BlockHeader};
5+
use bitcoin::hash_types::BlockHash;
6+
use bitcoin::network::constants::Network;
7+
8+
/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
9+
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
10+
///
11+
/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of
12+
/// failure, each listener may be left at a different block hash than the one it was originally
13+
/// paired with.
14+
///
15+
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
16+
/// switching to [`SpvClient`].
17+
///
18+
/// [`SpvClient`]: ../struct.SpvClient.html
19+
/// [`ChannelManager`]: ../../lightning/ln/channelmanager/struct.ChannelManager.html
20+
/// [`ChannelMonitor`]: ../../lightning/chain/channelmonitor/struct.ChannelMonitor.html
21+
pub async fn sync_listeners<B: BlockSource, C: Cache>(
22+
block_source: &mut B,
23+
network: Network,
24+
header_cache: &mut C,
25+
mut chain_listeners: Vec<(BlockHash, &mut dyn ChainListener)>,
26+
) -> BlockSourceResult<ValidatedBlockHeader> {
27+
let (best_block_hash, best_block_height) = block_source.get_best_block().await?;
28+
let new_header = block_source
29+
.get_header(&best_block_hash, best_block_height).await?
30+
.validate(best_block_hash)?;
31+
32+
// Fetch the header for the block hash paired with each listener.
33+
let mut chain_listeners_with_old_headers = Vec::new();
34+
for (old_block, chain_listener) in chain_listeners.drain(..) {
35+
let old_header = match header_cache.look_up(&old_block) {
36+
Some(header) => *header,
37+
None => block_source
38+
.get_header(&old_block, None).await?
39+
.validate(old_block)?
40+
};
41+
chain_listeners_with_old_headers.push((old_header, chain_listener))
42+
}
43+
44+
// Find differences and disconnect blocks for each listener individually.
45+
let mut chain_poller = ChainPoller::new(block_source, network);
46+
let mut chain_listeners_at_height = Vec::new();
47+
let mut most_common_ancestor = None;
48+
let mut most_connected_blocks = Vec::new();
49+
for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) {
50+
// Disconnect any stale blocks, but keep them in the cache for the next iteration.
51+
let header_cache = &mut ReadOnlyCache(header_cache);
52+
let mut chain_notifier = ChainNotifier { header_cache };
53+
let difference =
54+
chain_notifier.find_difference(new_header, &old_header, &mut chain_poller).await?;
55+
chain_notifier.disconnect_blocks(
56+
difference.disconnected_blocks,
57+
&mut DynamicChainListener(chain_listener),
58+
);
59+
60+
// Keep track of the most common ancestor and all blocks connected across all listeners.
61+
chain_listeners_at_height.push((difference.common_ancestor.height, chain_listener));
62+
if difference.connected_blocks.len() > most_connected_blocks.len() {
63+
most_common_ancestor = Some(difference.common_ancestor);
64+
most_connected_blocks = difference.connected_blocks;
65+
}
66+
}
67+
68+
// Connect new blocks for all listeners at once to avoid re-fetching blocks.
69+
if let Some(common_ancestor) = most_common_ancestor {
70+
let mut chain_notifier = ChainNotifier { header_cache };
71+
let mut chain_listener = ChainListenerSet(chain_listeners_at_height);
72+
chain_notifier.connect_blocks(
73+
common_ancestor,
74+
most_connected_blocks,
75+
&mut chain_poller,
76+
&mut chain_listener,
77+
).await.or_else(|(e, _)| Err(e))?;
78+
}
79+
80+
Ok(new_header)
81+
}
82+
83+
/// A wrapper to make a cache read-only.
84+
///
85+
/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one
86+
/// listener.
87+
struct ReadOnlyCache<'a, C: Cache>(&'a mut C);
88+
89+
impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
90+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
91+
self.0.look_up(block_hash)
92+
}
93+
94+
fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) {
95+
unreachable!()
96+
}
97+
98+
fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
99+
None
100+
}
101+
}
102+
103+
/// Wrapper for supporting dynamically sized chain listeners.
104+
struct DynamicChainListener<'a>(&'a mut dyn ChainListener);
105+
106+
impl<'a> ChainListener for DynamicChainListener<'a> {
107+
fn block_connected(&mut self, _block: &Block, _height: u32) {
108+
unreachable!()
109+
}
110+
111+
fn block_disconnected(&mut self, header: &BlockHeader, height: u32) {
112+
self.0.block_disconnected(header, height)
113+
}
114+
}
115+
116+
/// A set of dynamically sized chain listeners, each paired with a starting block height.
117+
struct ChainListenerSet<'a>(Vec<(u32, &'a mut dyn ChainListener)>);
118+
119+
impl<'a> ChainListener for ChainListenerSet<'a> {
120+
fn block_connected(&mut self, block: &Block, height: u32) {
121+
for (starting_height, chain_listener) in self.0.iter_mut() {
122+
if height > *starting_height {
123+
chain_listener.block_connected(block, height);
124+
}
125+
}
126+
}
127+
128+
fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {
129+
unreachable!()
130+
}
131+
}
132+
133+
#[cfg(test)]
134+
mod tests {
135+
use crate::test_utils::{Blockchain, MockChainListener};
136+
use super::*;
137+
138+
use bitcoin::network::constants::Network;
139+
140+
#[tokio::test]
141+
async fn sync_from_same_chain() {
142+
let mut chain = Blockchain::default().with_height(4);
143+
144+
let mut listener_1 = MockChainListener::new()
145+
.expect_block_connected(*chain.at_height(2))
146+
.expect_block_connected(*chain.at_height(3))
147+
.expect_block_connected(*chain.at_height(4));
148+
let mut listener_2 = MockChainListener::new()
149+
.expect_block_connected(*chain.at_height(3))
150+
.expect_block_connected(*chain.at_height(4));
151+
let mut listener_3 = MockChainListener::new()
152+
.expect_block_connected(*chain.at_height(4));
153+
154+
let listeners = vec![
155+
(chain.at_height(1).block_hash, &mut listener_1 as &mut dyn ChainListener),
156+
(chain.at_height(2).block_hash, &mut listener_2 as &mut dyn ChainListener),
157+
(chain.at_height(3).block_hash, &mut listener_3 as &mut dyn ChainListener),
158+
];
159+
let mut cache = chain.header_cache(0..=4);
160+
match sync_listeners(&mut chain, Network::Bitcoin, &mut cache, listeners).await {
161+
Ok(header) => assert_eq!(header, chain.tip()),
162+
Err(e) => panic!("Unexpected error: {:?}", e),
163+
}
164+
}
165+
166+
#[tokio::test]
167+
async fn sync_from_different_chains() {
168+
let mut main_chain = Blockchain::default().with_height(4);
169+
let fork_chain_1 = main_chain.fork_at_height(1);
170+
let fork_chain_2 = main_chain.fork_at_height(2);
171+
let fork_chain_3 = main_chain.fork_at_height(3);
172+
173+
let mut listener_1 = MockChainListener::new()
174+
.expect_block_disconnected(*fork_chain_1.at_height(4))
175+
.expect_block_disconnected(*fork_chain_1.at_height(3))
176+
.expect_block_disconnected(*fork_chain_1.at_height(2))
177+
.expect_block_connected(*main_chain.at_height(2))
178+
.expect_block_connected(*main_chain.at_height(3))
179+
.expect_block_connected(*main_chain.at_height(4));
180+
let mut listener_2 = MockChainListener::new()
181+
.expect_block_disconnected(*fork_chain_2.at_height(4))
182+
.expect_block_disconnected(*fork_chain_2.at_height(3))
183+
.expect_block_connected(*main_chain.at_height(3))
184+
.expect_block_connected(*main_chain.at_height(4));
185+
let mut listener_3 = MockChainListener::new()
186+
.expect_block_disconnected(*fork_chain_3.at_height(4))
187+
.expect_block_connected(*main_chain.at_height(4));
188+
189+
let listeners = vec![
190+
(fork_chain_1.tip().block_hash, &mut listener_1 as &mut dyn ChainListener),
191+
(fork_chain_2.tip().block_hash, &mut listener_2 as &mut dyn ChainListener),
192+
(fork_chain_3.tip().block_hash, &mut listener_3 as &mut dyn ChainListener),
193+
];
194+
let mut cache = fork_chain_1.header_cache(2..=4);
195+
cache.extend(fork_chain_2.header_cache(3..=4));
196+
cache.extend(fork_chain_3.header_cache(4..=4));
197+
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
198+
Ok(header) => assert_eq!(header, main_chain.tip()),
199+
Err(e) => panic!("Unexpected error: {:?}", e),
200+
}
201+
}
202+
203+
#[tokio::test]
204+
async fn sync_from_overlapping_chains() {
205+
let mut main_chain = Blockchain::default().with_height(4);
206+
let fork_chain_1 = main_chain.fork_at_height(1);
207+
let fork_chain_2 = fork_chain_1.fork_at_height(2);
208+
let fork_chain_3 = fork_chain_2.fork_at_height(3);
209+
210+
let mut listener_1 = MockChainListener::new()
211+
.expect_block_disconnected(*fork_chain_1.at_height(4))
212+
.expect_block_disconnected(*fork_chain_1.at_height(3))
213+
.expect_block_disconnected(*fork_chain_1.at_height(2))
214+
.expect_block_connected(*main_chain.at_height(2))
215+
.expect_block_connected(*main_chain.at_height(3))
216+
.expect_block_connected(*main_chain.at_height(4));
217+
let mut listener_2 = MockChainListener::new()
218+
.expect_block_disconnected(*fork_chain_2.at_height(4))
219+
.expect_block_disconnected(*fork_chain_2.at_height(3))
220+
.expect_block_disconnected(*fork_chain_2.at_height(2))
221+
.expect_block_connected(*main_chain.at_height(2))
222+
.expect_block_connected(*main_chain.at_height(3))
223+
.expect_block_connected(*main_chain.at_height(4));
224+
let mut listener_3 = MockChainListener::new()
225+
.expect_block_disconnected(*fork_chain_3.at_height(4))
226+
.expect_block_disconnected(*fork_chain_3.at_height(3))
227+
.expect_block_disconnected(*fork_chain_3.at_height(2))
228+
.expect_block_connected(*main_chain.at_height(2))
229+
.expect_block_connected(*main_chain.at_height(3))
230+
.expect_block_connected(*main_chain.at_height(4));
231+
232+
let listeners = vec![
233+
(fork_chain_1.tip().block_hash, &mut listener_1 as &mut dyn ChainListener),
234+
(fork_chain_2.tip().block_hash, &mut listener_2 as &mut dyn ChainListener),
235+
(fork_chain_3.tip().block_hash, &mut listener_3 as &mut dyn ChainListener),
236+
];
237+
let mut cache = fork_chain_1.header_cache(2..=4);
238+
cache.extend(fork_chain_2.header_cache(3..=4));
239+
cache.extend(fork_chain_3.header_cache(4..=4));
240+
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
241+
Ok(header) => assert_eq!(header, main_chain.tip()),
242+
Err(e) => panic!("Unexpected error: {:?}", e),
243+
}
244+
}
245+
246+
#[tokio::test]
247+
async fn cache_connected_and_keep_disconnected_blocks() {
248+
let mut main_chain = Blockchain::default().with_height(2);
249+
let fork_chain = main_chain.fork_at_height(1);
250+
let new_tip = main_chain.tip();
251+
let old_tip = fork_chain.tip();
252+
253+
let mut listener = MockChainListener::new()
254+
.expect_block_disconnected(*old_tip)
255+
.expect_block_connected(*new_tip);
256+
257+
let listeners = vec![(old_tip.block_hash, &mut listener as &mut dyn ChainListener)];
258+
let mut cache = fork_chain.header_cache(2..=2);
259+
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
260+
Ok(_) => {
261+
assert!(cache.contains_key(&new_tip.block_hash));
262+
assert!(cache.contains_key(&old_tip.block_hash));
263+
},
264+
Err(e) => panic!("Unexpected error: {:?}", e),
265+
}
266+
}
267+
}

0 commit comments

Comments
 (0)