Skip to content

Commit 7b1c96c

Browse files
committed
Proof of concept for BlockSource multiplexing
1 parent d33c7c3 commit 7b1c96c

File tree

2 files changed

+153
-16
lines changed

2 files changed

+153
-16
lines changed

lightning-block-sync/src/lib.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -357,10 +357,10 @@ where P: Poll<'a, B>,
357357
CL: ChainListener + Sized {
358358
chain_tip: (BlockHash, BlockHeaderData),
359359
chain_poller: P,
360-
backup_block_sources: Vec<B>,
361360
header_cache: HeaderCache,
362361
chain_notifier: CL,
363-
mainnet: bool
362+
mainnet: bool,
363+
marker: std::marker::PhantomData<B>,
364364
}
365365

366366
impl<'a, P, B, CL> MicroSPVClient<'a, P, B, CL>
@@ -380,11 +380,12 @@ where P: Poll<'a, B>,
380380
/// useful when you have a block source which is more censorship-resistant than others but
381381
/// which only provides headers. In this case, we can use such source(s) to learn of a censorship
382382
/// attack without giving up privacy by querying a privacy-losing block sources.
383-
pub fn init(chain_tip: BlockHeaderData, chain_poller: P, backup_block_sources: Vec<B>, chain_notifier: CL, mainnet: bool) -> Self {
383+
pub fn init(chain_tip: BlockHeaderData, chain_poller: P, chain_notifier: CL, mainnet: bool) -> Self {
384384
let header_cache = HeaderCache::new();
385385
Self {
386386
chain_tip: (chain_tip.header.block_hash(), chain_tip),
387-
chain_poller, backup_block_sources, header_cache, chain_notifier, mainnet
387+
chain_poller, header_cache, chain_notifier, mainnet,
388+
marker: std::marker::PhantomData
388389
}
389390
}
390391

@@ -420,15 +421,7 @@ where P: Poll<'a, B>,
420421
Ok((ChainTip::Better(new_hash, new_header), block_source)) => {
421422
debug_assert_ne!(new_hash, self.chain_tip.0);
422423
debug_assert!(new_header.chainwork > self.chain_tip.1.chainwork);
423-
let mut blocks_connected = false;
424-
let backup_block_sources = self.backup_block_sources.iter_mut().map(|s| &mut **s);
425-
for source in std::iter::once(block_source).chain(backup_block_sources) {
426-
blocks_connected |= sync_chain_monitor!(new_hash, new_header, source);
427-
if self.chain_tip.0 == new_hash {
428-
break;
429-
}
430-
}
431-
blocks_connected
424+
sync_chain_monitor!(new_hash, new_header, block_source)
432425
},
433426
Ok((ChainTip::Worse(hash, header), _)) => {
434427
debug_assert_ne!(hash, self.chain_tip.0);
@@ -786,8 +779,9 @@ mod tests {
786779
let mut source_three = &header_chain;
787780
let mut source_four = &backup_chain;
788781
let mut client = MicroSPVClient::init((&chain_one).get_header(&block_1a_hash, Some(1)).await.unwrap(),
789-
poller::MultipleChainPoller::new(vec![&mut source_one as &mut dyn BlockSource, &mut source_two as &mut dyn BlockSource, &mut source_three as &mut dyn BlockSource]),
790-
vec![&mut source_four as &mut dyn BlockSource],
782+
poller::ChainMultiplexer::new(
783+
vec![&mut source_one as &mut dyn BlockSource, &mut source_two as &mut dyn BlockSource, &mut source_three as &mut dyn BlockSource],
784+
vec![&mut source_four as &mut dyn BlockSource]),
791785
Arc::clone(&chain_notifier), true);
792786

793787
// Test that we will reorg onto 2b because chain_one knows about 1b + 2b

lightning-block-sync/src/poller.rs

Lines changed: 144 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, ChainTip, Poll};
22

3+
use bitcoin::blockdata::block::Block;
4+
use bitcoin::hash_types::BlockHash;
5+
36
use std::ops::DerefMut;
47

58
pub struct ChainPoller<'a, B: DerefMut<Target=dyn BlockSource + 'a> + Sized + Sync + Send> {
@@ -59,7 +62,6 @@ impl<'a, B: DerefMut<Target=dyn BlockSource + 'a> + Sized + Sync + Send> Multipl
5962
}
6063

6164
impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> Poll<'b, B> for MultipleChainPoller<'b, B> {
62-
6365
fn poll_chain_tip<'a>(&'a mut self, best_chain_tip: BlockHeaderData) ->
6466
AsyncBlockSourceResult<'a, (ChainTip, &'a mut B::Target)>
6567
where 'b: 'a {
@@ -102,6 +104,147 @@ impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> Poll<'b
102104
}
103105
}
104106

107+
pub struct ChainMultiplexer<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> {
108+
block_sources: Vec<(B, BlockSourceError)>,
109+
backup_block_sources: Vec<(B, BlockSourceError)>,
110+
best_block_source: usize,
111+
}
112+
113+
impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> ChainMultiplexer<'b, B> {
114+
pub fn new(mut block_sources: Vec<B>, mut backup_block_sources: Vec<B>) -> Self {
115+
assert!(!block_sources.is_empty());
116+
let block_sources = block_sources.drain(..).map(|block_source| {
117+
(block_source, BlockSourceError::Transient)
118+
}).collect();
119+
120+
let backup_block_sources = backup_block_sources.drain(..).map(|block_source| {
121+
(block_source, BlockSourceError::Transient)
122+
}).collect();
123+
124+
Self { block_sources, backup_block_sources, best_block_source: 0 }
125+
}
126+
127+
fn best_and_backup_block_sources(&mut self) -> Vec<&mut (B, BlockSourceError)> {
128+
let best_block_source = self.block_sources.get_mut(self.best_block_source).unwrap();
129+
let backup_block_sources = self.backup_block_sources.iter_mut();
130+
std::iter::once(best_block_source)
131+
.chain(backup_block_sources)
132+
.filter(|(_, e)| e == &BlockSourceError::Transient)
133+
.collect()
134+
}
135+
}
136+
137+
impl<'b, B: 'b + DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> Poll<'b, B> for ChainMultiplexer<'b, B> {
138+
fn poll_chain_tip<'a>(&'a mut self, best_chain_tip: BlockHeaderData) ->
139+
AsyncBlockSourceResult<'a, (ChainTip, &'a mut B::Target)>
140+
where 'b: 'a {
141+
Box::pin(async move {
142+
let mut heaviest_chain_tip = best_chain_tip;
143+
let mut best_result = Err(BlockSourceError::Persistent);
144+
for (i, (block_source, error)) in self.block_sources.iter_mut().enumerate() {
145+
if let BlockSourceError::Persistent = error {
146+
continue;
147+
}
148+
149+
let result = match block_source.get_best_block().await {
150+
Err(e) => Err(e),
151+
Ok((block_hash, height)) => {
152+
if block_hash == heaviest_chain_tip.header.block_hash() {
153+
Ok(ChainTip::Common)
154+
} else {
155+
match block_source.get_header(&block_hash, height).await {
156+
Err(e) => Err(e),
157+
Ok(chain_tip) => {
158+
crate::stateless_check_header(&chain_tip.header)?;
159+
if chain_tip.header.block_hash() != block_hash {
160+
Err(BlockSourceError::Persistent)
161+
} else if chain_tip.chainwork <= heaviest_chain_tip.chainwork {
162+
Ok(ChainTip::Worse(block_hash, chain_tip))
163+
} else {
164+
Ok(ChainTip::Better(block_hash, chain_tip))
165+
}
166+
},
167+
}
168+
}
169+
},
170+
};
171+
172+
match result {
173+
Err(BlockSourceError::Persistent) => {
174+
*error = BlockSourceError::Persistent;
175+
},
176+
Err(BlockSourceError::Transient) => {
177+
if best_result.is_err() {
178+
best_result = result;
179+
}
180+
},
181+
Ok(ChainTip::Common) => {
182+
if let Ok(ChainTip::Better(_, _)) = best_result {} else {
183+
best_result = result;
184+
}
185+
},
186+
Ok(ChainTip::Better(_, header)) => {
187+
self.best_block_source = i;
188+
best_result = result;
189+
heaviest_chain_tip = header;
190+
},
191+
Ok(ChainTip::Worse(_, _)) => {
192+
if best_result.is_err() {
193+
best_result = result;
194+
}
195+
},
196+
}
197+
}
198+
199+
match best_result {
200+
Err(e) => Err(e),
201+
Ok(chain_tip) => Ok((chain_tip, self as &mut dyn BlockSource)),
202+
}
203+
})
204+
}
205+
}
206+
207+
impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> BlockSource for ChainMultiplexer<'b, B> {
208+
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
209+
Box::pin(async move {
210+
for (block_source, error) in self.best_and_backup_block_sources() {
211+
let result = block_source.get_header(header_hash, height).await;
212+
match result {
213+
Err(e) => *error = e,
214+
Ok(_) => return result,
215+
}
216+
}
217+
Err(BlockSourceError::Persistent)
218+
})
219+
}
220+
221+
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
222+
Box::pin(async move {
223+
for (block_source, error) in self.best_and_backup_block_sources() {
224+
let result = block_source.get_block(header_hash).await;
225+
match result {
226+
Err(e) => *error = e,
227+
Ok(_) => return result,
228+
}
229+
}
230+
Err(BlockSourceError::Persistent)
231+
})
232+
}
233+
234+
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
235+
Box::pin(async move {
236+
for (block_source, error) in self.best_and_backup_block_sources() {
237+
let result = block_source.get_best_block().await;
238+
match result {
239+
Err(e) => *error = e,
240+
Ok(_) => return result,
241+
}
242+
}
243+
Err(BlockSourceError::Persistent)
244+
})
245+
}
246+
}
247+
105248
#[cfg(test)]
106249
mod tests {
107250
use crate::*;

0 commit comments

Comments
 (0)