Skip to content

Commit 99881bd

Browse files
committed
WIP: New interface for syncing multiple listeners
1 parent c12631d commit 99881bd

File tree

3 files changed

+304
-75
lines changed

3 files changed

+304
-75
lines changed

lightning-block-sync/src/init.rs

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
use crate::{BlockSource, BlockSourceResult, Cache, ChainListener, ChainNotifier};
2+
use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
3+
4+
use bitcoin::blockdata::block::{Block, BlockHeader};
5+
use bitcoin::hash_types::BlockHash;
6+
use bitcoin::network::constants::Network;
7+
8+
/// Unbounded cache of block headers keyed by block hash.
9+
pub type UnboundedCache = std::collections::HashMap<BlockHash, ValidatedBlockHeader>;
10+
11+
impl Cache for UnboundedCache {
12+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
13+
self.get(block_hash)
14+
}
15+
16+
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
17+
self.insert(block_hash, block_header);
18+
}
19+
20+
fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
21+
None
22+
}
23+
}
24+
25+
/// Performs a one-time sync of chain listeners from a single *trusted* block source, bringing their
26+
/// view of the latest chain tip to `new_block`.
27+
///
28+
/// Useful upon startup to bring each `ChannelMonitor` and the `ChannelManager` in sync, before
29+
/// switching to `SpvClient`.
30+
pub async fn sync_listeners<B: BlockSource, C: Cache>(
31+
new_block: BlockHash,
32+
block_source: &mut B,
33+
network: Network,
34+
chain_notifier: &mut ChainNotifier<C>,
35+
mut chain_listeners: Vec<(BlockHash, &mut dyn ChainListener)>,
36+
) -> BlockSourceResult<()> {
37+
let new_header = match chain_notifier.header_cache.look_up(&new_block) {
38+
Some(header) => *header,
39+
None => block_source
40+
.get_header(&new_block, None).await?
41+
.validate(new_block)?,
42+
};
43+
44+
// Find differences and disconnect blocks for each listener individually.
45+
let mut chain_listeners_with_common_ancestors = Vec::new();
46+
let mut most_common_ancestor = None;
47+
let mut most_connected_blocks = Vec::new();
48+
for (old_block, chain_listener) in chain_listeners.drain(..) {
49+
let old_header = match chain_notifier.header_cache.look_up(&old_block) {
50+
Some(header) => *header,
51+
None => block_source
52+
.get_header(&old_block, None).await?
53+
.validate(old_block)?
54+
};
55+
56+
// Disconnect any stale blocks.
57+
let mut chain_poller = ChainPoller::new(block_source as &mut dyn BlockSource, network);
58+
let difference =
59+
chain_notifier.find_difference(new_header, &old_header, &mut chain_poller).await?;
60+
chain_notifier.disconnect_blocks(
61+
difference.disconnected_blocks,
62+
&mut DynamicChainListener(chain_listener),
63+
);
64+
65+
// Keep track of the most common ancestor and all blocks connected across all listeners.
66+
chain_listeners_with_common_ancestors.push((difference.common_ancestor, chain_listener));
67+
if difference.connected_blocks.len() > most_connected_blocks.len() {
68+
most_common_ancestor = Some(difference.common_ancestor);
69+
most_connected_blocks = difference.connected_blocks;
70+
}
71+
}
72+
73+
// Connect new blocks for all listeners at once to avoid re-fetching blocks.
74+
if let Some(common_ancestor) = most_common_ancestor {
75+
let mut chain_poller = ChainPoller::new(block_source as &mut dyn BlockSource, network);
76+
let mut chain_listener = ChainListenerSet(chain_listeners_with_common_ancestors);
77+
chain_notifier.connect_blocks(
78+
common_ancestor,
79+
most_connected_blocks,
80+
&mut chain_poller,
81+
&mut chain_listener,
82+
).await.unwrap();
83+
}
84+
Ok(())
85+
}
86+
87+
struct DynamicChainListener<'a>(&'a mut dyn ChainListener);
88+
89+
impl<'a> ChainListener for DynamicChainListener<'a> {
90+
fn block_connected(&mut self, _block: &Block, _height: u32) {
91+
unreachable!()
92+
}
93+
94+
fn block_disconnected(&mut self, header: &BlockHeader, height: u32) {
95+
self.0.block_disconnected(header, height)
96+
}
97+
}
98+
99+
struct ChainListenerSet<'a>(Vec<(ValidatedBlockHeader, &'a mut dyn ChainListener)>);
100+
101+
impl<'a> ChainListener for ChainListenerSet<'a> {
102+
fn block_connected(&mut self, block: &Block, height: u32) {
103+
for (header, chain_listener) in self.0.iter_mut() {
104+
if height > header.height {
105+
chain_listener.block_connected(block, height);
106+
}
107+
}
108+
}
109+
110+
fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {
111+
unreachable!()
112+
}
113+
}
114+
115+
#[cfg(test)]
116+
mod tests {
117+
use crate::test_utils::{Blockchain, MockChainListener};
118+
use super::*;
119+
120+
use bitcoin::network::constants::Network;
121+
122+
#[tokio::test]
123+
async fn sync_from_same_chain() {
124+
let mut chain = Blockchain::default().with_height(4);
125+
let new_tip = chain.tip();
126+
let old_tip_1 = chain.at_height(1);
127+
let old_tip_2 = chain.at_height(2);
128+
let old_tip_3 = chain.at_height(3);
129+
130+
let mut listener_1 = MockChainListener::new()
131+
.expect_block_connected(*old_tip_2)
132+
.expect_block_connected(*old_tip_3)
133+
.expect_block_connected(*new_tip);
134+
let mut listener_2 = MockChainListener::new()
135+
.expect_block_connected(*old_tip_3)
136+
.expect_block_connected(*new_tip);
137+
let mut listener_3 = MockChainListener::new()
138+
.expect_block_connected(*new_tip);
139+
140+
let listeners = vec![
141+
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener),
142+
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener),
143+
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener),
144+
];
145+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=4) };
146+
match sync_listeners(new_tip.block_hash, &mut chain, Network::Bitcoin, &mut notifier, listeners).await {
147+
Ok(()) => {},
148+
Err(e) => panic!("Unexpected error: {:?}", e),
149+
}
150+
}
151+
152+
#[tokio::test]
153+
async fn sync_from_different_chains() {
154+
let mut main_chain = Blockchain::default().with_height(4);
155+
let fork_chain_1 = main_chain.fork_at_height(1);
156+
let fork_chain_2 = main_chain.fork_at_height(2);
157+
let fork_chain_3 = main_chain.fork_at_height(3);
158+
159+
let new_tip = main_chain.tip();
160+
let old_tip_1 = fork_chain_1.tip();
161+
let old_tip_2 = fork_chain_2.tip();
162+
let old_tip_3 = fork_chain_3.tip();
163+
164+
let mut listener_1 = MockChainListener::new()
165+
.expect_block_disconnected(*fork_chain_1.at_height(4))
166+
.expect_block_disconnected(*fork_chain_1.at_height(3))
167+
.expect_block_disconnected(*fork_chain_1.at_height(2))
168+
.expect_block_connected(*main_chain.at_height(2))
169+
.expect_block_connected(*main_chain.at_height(3))
170+
.expect_block_connected(*main_chain.at_height(4));
171+
let mut listener_2 = MockChainListener::new()
172+
.expect_block_disconnected(*fork_chain_2.at_height(4))
173+
.expect_block_disconnected(*fork_chain_2.at_height(3))
174+
.expect_block_connected(*main_chain.at_height(3))
175+
.expect_block_connected(*main_chain.at_height(4));
176+
let mut listener_3 = MockChainListener::new()
177+
.expect_block_disconnected(*fork_chain_3.at_height(4))
178+
.expect_block_connected(*main_chain.at_height(4));
179+
180+
let listeners = vec![
181+
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener),
182+
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener),
183+
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener),
184+
];
185+
let mut header_cache = fork_chain_1.header_cache(2..=4);
186+
header_cache.extend(fork_chain_2.header_cache(3..=4));
187+
header_cache.extend(fork_chain_3.header_cache(4..=4));
188+
let mut notifier = ChainNotifier { header_cache };
189+
match sync_listeners(new_tip.block_hash, &mut main_chain, Network::Bitcoin, &mut notifier, listeners).await {
190+
Ok(()) => {},
191+
Err(e) => panic!("Unexpected error: {:?}", e),
192+
}
193+
}
194+
195+
#[tokio::test]
196+
async fn sync_from_overlapping_chains() {
197+
let mut main_chain = Blockchain::default().with_height(4);
198+
let fork_chain_1 = main_chain.fork_at_height(1);
199+
let fork_chain_2 = fork_chain_1.fork_at_height(2);
200+
let fork_chain_3 = fork_chain_2.fork_at_height(3);
201+
202+
let new_tip = main_chain.tip();
203+
let old_tip_1 = fork_chain_1.tip();
204+
let old_tip_2 = fork_chain_2.tip();
205+
let old_tip_3 = fork_chain_3.tip();
206+
207+
let mut listener_1 = MockChainListener::new()
208+
.expect_block_disconnected(*fork_chain_1.at_height(4))
209+
.expect_block_disconnected(*fork_chain_1.at_height(3))
210+
.expect_block_disconnected(*fork_chain_1.at_height(2))
211+
.expect_block_connected(*main_chain.at_height(2))
212+
.expect_block_connected(*main_chain.at_height(3))
213+
.expect_block_connected(*main_chain.at_height(4));
214+
let mut listener_2 = MockChainListener::new()
215+
.expect_block_disconnected(*fork_chain_2.at_height(4))
216+
.expect_block_disconnected(*fork_chain_2.at_height(3))
217+
.expect_block_disconnected(*fork_chain_2.at_height(2))
218+
.expect_block_connected(*main_chain.at_height(2))
219+
.expect_block_connected(*main_chain.at_height(3))
220+
.expect_block_connected(*main_chain.at_height(4));
221+
let mut listener_3 = MockChainListener::new()
222+
.expect_block_disconnected(*fork_chain_3.at_height(4))
223+
.expect_block_disconnected(*fork_chain_3.at_height(3))
224+
.expect_block_disconnected(*fork_chain_3.at_height(2))
225+
.expect_block_connected(*main_chain.at_height(2))
226+
.expect_block_connected(*main_chain.at_height(3))
227+
.expect_block_connected(*main_chain.at_height(4));
228+
229+
let listeners = vec![
230+
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener),
231+
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener),
232+
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener),
233+
];
234+
let mut header_cache = fork_chain_1.header_cache(2..=4);
235+
header_cache.extend(fork_chain_2.header_cache(3..=4));
236+
header_cache.extend(fork_chain_3.header_cache(4..=4));
237+
let mut notifier = ChainNotifier { header_cache };
238+
match sync_listeners(new_tip.block_hash, &mut main_chain, Network::Bitcoin, &mut notifier, listeners).await {
239+
Ok(()) => {},
240+
Err(e) => panic!("Unexpected error: {:?}", e),
241+
}
242+
}
243+
}

0 commit comments

Comments
 (0)