Skip to content

Commit b1ecfe7

Browse files
committed
Add ChainNotifier and define ChainListener trait
Add an interface for being notified of block connected and disconnected events, along with a notifier for generating such events. Used while polling block sources for a new tip in order to feed these events into ChannelManager and ChainMonitor.
1 parent 05ec061 commit b1ecfe7

File tree

3 files changed

+419
-4
lines changed

3 files changed

+419
-4
lines changed

lightning-block-sync/src/lib.rs

Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ mod test_utils;
3131
#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
3232
mod utils;
3333

34+
use crate::poll::{Poll, ValidatedBlockHeader};
35+
3436
use bitcoin::blockdata::block::{Block, BlockHeader};
3537
use bitcoin::hash_types::BlockHash;
3638
use bitcoin::util::uint::Uint256;
@@ -130,3 +132,317 @@ pub struct BlockHeaderData {
130132
/// of equivalent weight.
131133
pub chainwork: Uint256,
132134
}
135+
136+
/// Adaptor used for notifying when blocks have been connected or disconnected from the chain.
137+
///
138+
/// Used when needing to replay chain data upon startup or as new chain events occur.
139+
pub trait ChainListener {
140+
/// Notifies the listener that a block was added at the given height.
141+
fn block_connected(&mut self, block: &Block, height: u32);
142+
143+
/// Notifies the listener that a block was removed at the given height.
144+
fn block_disconnected(&mut self, header: &BlockHeader, height: u32);
145+
}
146+
147+
/// The `Cache` trait defines behavior for managing a block header cache, where block headers are
148+
/// keyed by block hash.
149+
///
150+
/// Used by [`ChainNotifier`] to store headers along the best chain, which is important for ensuring
151+
/// that blocks can be disconnected if they are no longer accessible from a block source (e.g., if
152+
/// the block source does not store stale forks indefinitely).
153+
///
154+
/// Implementations may define how long to retain headers such that it's unlikely they will ever be
155+
/// needed to disconnect a block. In cases where block sources provide access to headers on stale
156+
/// forks reliably, caches may be entirely unnecessary.
157+
///
158+
/// [`ChainNotifier`]: struct.ChainNotifier.html
159+
pub trait Cache {
160+
/// Retrieves the block header keyed by the given block hash.
161+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>;
162+
163+
/// Called when a block has been connected to the best chain to ensure it is available to be
164+
/// disconnected later if needed.
165+
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader);
166+
167+
/// Called when a block has been disconnected from the best chain. Once disconnected, a block's
168+
/// header is no longer needed and thus can be removed.
169+
fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader>;
170+
}
171+
172+
/// Unbounded cache of block headers keyed by block hash.
173+
pub type UnboundedCache = std::collections::HashMap<BlockHash, ValidatedBlockHeader>;
174+
175+
impl Cache for UnboundedCache {
176+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
177+
self.get(block_hash)
178+
}
179+
180+
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
181+
self.insert(block_hash, block_header);
182+
}
183+
184+
fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
185+
self.remove(block_hash)
186+
}
187+
}
188+
189+
/// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
190+
///
191+
/// [listeners]: trait.ChainListener.html
192+
struct ChainNotifier<C: Cache> {
193+
/// Cache for looking up headers before fetching from a block source.
194+
header_cache: C,
195+
}
196+
197+
/// Changes made to the chain between subsequent polls that transformed it from having one chain tip
198+
/// to another.
199+
///
200+
/// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order
201+
/// before new blocks are connected in reverse order.
202+
struct ChainDifference {
203+
/// Blocks that were disconnected from the chain since the last poll.
204+
disconnected_blocks: Vec<ValidatedBlockHeader>,
205+
206+
/// Blocks that were connected to the chain since the last poll.
207+
connected_blocks: Vec<ValidatedBlockHeader>,
208+
}
209+
210+
impl<C: Cache> ChainNotifier<C> {
211+
/// Finds the fork point between `new_header` and `old_header`, disconnecting blocks from
212+
/// `old_header` to get to that point and then connecting blocks until `new_header`.
213+
///
214+
/// Validates headers along the transition path, but doesn't fetch blocks until the chain is
215+
/// disconnected to the fork point. Thus, this may return an `Err` that includes where the tip
216+
/// ended up which may not be `new_header`. Note that the returned `Err` contains `Some` header
217+
/// if and only if the transition from `old_header` to `new_header` is valid.
218+
async fn synchronize_listener<L: ChainListener, P: Poll>(
219+
&mut self,
220+
new_header: ValidatedBlockHeader,
221+
old_header: &ValidatedBlockHeader,
222+
chain_poller: &mut P,
223+
chain_listener: &mut L,
224+
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
225+
let mut difference = self.find_difference(new_header, old_header, chain_poller).await
226+
.map_err(|e| (e, None))?;
227+
228+
let mut new_tip = *old_header;
229+
for header in difference.disconnected_blocks.drain(..) {
230+
if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) {
231+
assert_eq!(cached_header, header);
232+
}
233+
chain_listener.block_disconnected(&header.header, header.height);
234+
new_tip = header;
235+
}
236+
237+
for header in difference.connected_blocks.drain(..).rev() {
238+
let block = chain_poller
239+
.fetch_block(&header).await
240+
.or_else(|e| Err((e, Some(new_tip))))?;
241+
debug_assert_eq!(block.block_hash, header.block_hash);
242+
243+
self.header_cache.block_connected(header.block_hash, header);
244+
chain_listener.block_connected(&block, header.height);
245+
new_tip = header;
246+
}
247+
248+
Ok(())
249+
}
250+
251+
/// Returns the changes needed to produce the chain with `current_header` as its tip from the
252+
/// chain with `prev_header` as its tip.
253+
///
254+
/// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
255+
async fn find_difference<P: Poll>(
256+
&self,
257+
current_header: ValidatedBlockHeader,
258+
prev_header: &ValidatedBlockHeader,
259+
chain_poller: &mut P,
260+
) -> BlockSourceResult<ChainDifference> {
261+
let mut disconnected_blocks = Vec::new();
262+
let mut connected_blocks = Vec::new();
263+
let mut current = current_header;
264+
let mut previous = *prev_header;
265+
loop {
266+
// Found the common ancestor.
267+
if current.block_hash == previous.block_hash {
268+
break;
269+
}
270+
271+
// Walk back the chain, finding blocks needed to connect and disconnect. Only walk back
272+
// the header with the greater height, or both if equal heights.
273+
let current_height = current.height;
274+
let previous_height = previous.height;
275+
if current_height <= previous_height {
276+
disconnected_blocks.push(previous);
277+
previous = self.look_up_previous_header(chain_poller, &previous).await?;
278+
}
279+
if current_height >= previous_height {
280+
connected_blocks.push(current);
281+
current = self.look_up_previous_header(chain_poller, &current).await?;
282+
}
283+
}
284+
285+
Ok(ChainDifference { disconnected_blocks, connected_blocks })
286+
}
287+
288+
/// Returns the previous header for the given header, either by looking it up in the cache or
289+
/// fetching it if not found.
290+
async fn look_up_previous_header<P: Poll>(
291+
&self,
292+
chain_poller: &mut P,
293+
header: &ValidatedBlockHeader,
294+
) -> BlockSourceResult<ValidatedBlockHeader> {
295+
match self.header_cache.look_up(&header.header.prev_blockhash) {
296+
Some(prev_header) => Ok(*prev_header),
297+
None => chain_poller.look_up_previous_header(header).await,
298+
}
299+
}
300+
}
301+
302+
#[cfg(test)]
303+
mod chain_notifier_tests {
304+
use crate::test_utils::{Blockchain, MockChainListener};
305+
use super::*;
306+
307+
use bitcoin::network::constants::Network;
308+
309+
#[tokio::test]
310+
async fn sync_from_same_chain() {
311+
let mut chain = Blockchain::default().with_height(3);
312+
313+
let new_tip = chain.tip();
314+
let old_tip = chain.at_height(1);
315+
let mut listener = MockChainListener::new()
316+
.expect_block_connected(*chain.at_height(2))
317+
.expect_block_connected(*new_tip);
318+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=1) };
319+
let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
320+
match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
321+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
322+
Ok(_) => {},
323+
}
324+
}
325+
326+
#[tokio::test]
327+
async fn sync_from_different_chains() {
328+
let mut test_chain = Blockchain::with_network(Network::Testnet).with_height(1);
329+
let main_chain = Blockchain::with_network(Network::Bitcoin).with_height(1);
330+
331+
let new_tip = test_chain.tip();
332+
let old_tip = main_chain.tip();
333+
let mut listener = MockChainListener::new();
334+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=1) };
335+
let mut poller = poll::ChainPoller::new(&mut test_chain, Network::Testnet);
336+
match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
337+
Err((e, _)) => {
338+
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
339+
assert_eq!(e.into_inner().as_ref().to_string(), "genesis block reached");
340+
},
341+
Ok(_) => panic!("Expected error"),
342+
}
343+
}
344+
345+
#[tokio::test]
346+
async fn sync_from_equal_length_fork() {
347+
let main_chain = Blockchain::default().with_height(2);
348+
let mut fork_chain = main_chain.fork_at_height(1);
349+
350+
let new_tip = fork_chain.tip();
351+
let old_tip = main_chain.tip();
352+
let mut listener = MockChainListener::new()
353+
.expect_block_disconnected(*old_tip)
354+
.expect_block_connected(*new_tip);
355+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=2) };
356+
let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet);
357+
match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
358+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
359+
Ok(_) => {},
360+
}
361+
}
362+
363+
#[tokio::test]
364+
async fn sync_from_shorter_fork() {
365+
let main_chain = Blockchain::default().with_height(3);
366+
let mut fork_chain = main_chain.fork_at_height(1);
367+
fork_chain.disconnect_tip();
368+
369+
let new_tip = fork_chain.tip();
370+
let old_tip = main_chain.tip();
371+
let mut listener = MockChainListener::new()
372+
.expect_block_disconnected(*old_tip)
373+
.expect_block_disconnected(*main_chain.at_height(2))
374+
.expect_block_connected(*new_tip);
375+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=3) };
376+
let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet);
377+
match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
378+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
379+
Ok(_) => {},
380+
}
381+
}
382+
383+
#[tokio::test]
384+
async fn sync_from_longer_fork() {
385+
let mut main_chain = Blockchain::default().with_height(3);
386+
let mut fork_chain = main_chain.fork_at_height(1);
387+
main_chain.disconnect_tip();
388+
389+
let new_tip = fork_chain.tip();
390+
let old_tip = main_chain.tip();
391+
let mut listener = MockChainListener::new()
392+
.expect_block_disconnected(*old_tip)
393+
.expect_block_connected(*fork_chain.at_height(2))
394+
.expect_block_connected(*new_tip);
395+
let mut notifier = ChainNotifier { header_cache: main_chain.header_cache(0..=2) };
396+
let mut poller = poll::ChainPoller::new(&mut fork_chain, Network::Testnet);
397+
match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
398+
Err((e, _)) => panic!("Unexpected error: {:?}", e),
399+
Ok(_) => {},
400+
}
401+
}
402+
403+
#[tokio::test]
404+
async fn sync_from_chain_without_headers() {
405+
let mut chain = Blockchain::default().with_height(3).without_headers();
406+
407+
let new_tip = chain.tip();
408+
let old_tip = chain.at_height(1);
409+
let mut listener = MockChainListener::new();
410+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=1) };
411+
let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
412+
match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
413+
Err((_, tip)) => assert_eq!(tip, None),
414+
Ok(_) => panic!("Expected error"),
415+
}
416+
}
417+
418+
#[tokio::test]
419+
async fn sync_from_chain_without_any_new_blocks() {
420+
let mut chain = Blockchain::default().with_height(3).without_blocks(2..);
421+
422+
let new_tip = chain.tip();
423+
let old_tip = chain.at_height(1);
424+
let mut listener = MockChainListener::new();
425+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=3) };
426+
let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
427+
match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
428+
Err((_, tip)) => assert_eq!(tip, Some(old_tip)),
429+
Ok(_) => panic!("Expected error"),
430+
}
431+
}
432+
433+
#[tokio::test]
434+
async fn sync_from_chain_without_some_new_blocks() {
435+
let mut chain = Blockchain::default().with_height(3).without_blocks(3..);
436+
437+
let new_tip = chain.tip();
438+
let old_tip = chain.at_height(1);
439+
let mut listener = MockChainListener::new()
440+
.expect_block_connected(*chain.at_height(2));
441+
let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=3) };
442+
let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
443+
match notifier.synchronize_listener(new_tip, &old_tip, &mut poller, &mut listener).await {
444+
Err((_, tip)) => assert_eq!(tip, Some(chain.at_height(2))),
445+
Ok(_) => panic!("Expected error"),
446+
}
447+
}
448+
}

lightning-block-sync/src/poll.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl Validate for Block {
9696
/// A block header with validated proof of work and corresponding block hash.
9797
#[derive(Clone, Copy, Debug, PartialEq)]
9898
pub struct ValidatedBlockHeader {
99-
block_hash: BlockHash,
99+
pub(crate) block_hash: BlockHash,
100100
inner: BlockHeaderData,
101101
}
102102

@@ -142,7 +142,7 @@ impl ValidatedBlockHeader {
142142

143143
/// A block with validated data against its transaction list and corresponding block hash.
144144
pub struct ValidatedBlock {
145-
block_hash: BlockHash,
145+
pub(crate) block_hash: BlockHash,
146146
inner: Block,
147147
}
148148

0 commit comments

Comments
 (0)