-
Notifications
You must be signed in to change notification settings - Fork 409
Implement the UtxoSource interface for REST/RPC clients #2248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
8164cb9
01857b5
b315856
3482fce
189c1fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
use crate::{AsyncBlockSourceResult, BlockData, BlockSource}; | ||
|
||
use bitcoin::blockdata::block::Block; | ||
use bitcoin::blockdata::transaction::{TxOut, OutPoint}; | ||
use bitcoin::hash_types::BlockHash; | ||
|
||
|
@@ -17,7 +18,8 @@ use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupErr | |
|
||
use lightning::util::logger::Logger; | ||
|
||
use std::sync::Arc; | ||
use std::sync::{Arc, Mutex}; | ||
use std::collections::VecDeque; | ||
use std::future::Future; | ||
use std::ops::Deref; | ||
|
||
|
@@ -27,9 +29,6 @@ use std::ops::Deref; | |
/// Note that while this is implementable for a [`BlockSource`] which returns filtered block data | ||
/// (i.e. [`BlockData::HeaderOnly`] for [`BlockSource::get_block`] requests), such an | ||
/// implementation will reject all gossip as it is not fully able to verify the UTXOs referenced. | ||
/// | ||
/// For efficiency, an implementation may consider caching some set of blocks, as many redundant | ||
/// calls may be made. | ||
pub trait UtxoSource : BlockSource + 'static { | ||
/// Fetches the block hash of the block at the given height. | ||
/// | ||
|
@@ -91,8 +90,11 @@ pub struct GossipVerifier<S: FutureSpawner, | |
peer_manager: Arc<PeerManager<Descriptor, CM, Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, OM, L, CMH, NS>>, | ||
gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, | ||
spawn: S, | ||
block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>, | ||
} | ||
|
||
const BLOCK_CACHE_SIZE: usize = 5; | ||
|
||
impl<S: FutureSpawner, | ||
Blocks: Deref + Send + Sync + Clone, | ||
L: Deref + Send + Sync, | ||
|
@@ -114,34 +116,76 @@ impl<S: FutureSpawner, | |
/// This is expected to be given to a [`P2PGossipSync`] (initially constructed with `None` for | ||
/// the UTXO lookup) via [`P2PGossipSync::add_utxo_lookup`]. | ||
pub fn new(source: Blocks, spawn: S, gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, peer_manager: Arc<PeerManager<Descriptor, CM, Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, OM, L, CMH, NS>>) -> Self { | ||
Self { source, spawn, gossiper, peer_manager } | ||
Self { | ||
source, spawn, gossiper, peer_manager, | ||
block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))), | ||
} | ||
} | ||
|
||
async fn retrieve_utxo(source: Blocks, short_channel_id: u64) -> Result<TxOut, UtxoLookupError> { | ||
async fn retrieve_utxo( | ||
source: Blocks, block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>, short_channel_id: u64 | ||
) -> Result<TxOut, UtxoLookupError> { | ||
let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes | ||
let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32; | ||
let output_index = (short_channel_id & 0xffff) as u16; | ||
|
||
let block_hash = source.get_block_hash_by_height(block_height).await | ||
.map_err(|_| UtxoLookupError::UnknownTx)?; | ||
let block_data = source.get_block(&block_hash).await | ||
.map_err(|_| UtxoLookupError::UnknownTx)?; | ||
let mut block = match block_data { | ||
BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx), | ||
BlockData::FullBlock(block) => block, | ||
let (outpoint, output); | ||
|
||
'tx_found: loop { // Used as a simple goto | ||
macro_rules! process_block { | ||
($block: expr) => { { | ||
if transaction_index as usize >= $block.txdata.len() { | ||
return Err(UtxoLookupError::UnknownTx); | ||
} | ||
let transaction = &$block.txdata[transaction_index as usize]; | ||
if output_index as usize >= transaction.output.len() { | ||
return Err(UtxoLookupError::UnknownTx); | ||
} | ||
|
||
outpoint = OutPoint::new(transaction.txid(), output_index.into()); | ||
output = transaction.output[output_index as usize].clone(); | ||
} } | ||
} | ||
{ | ||
let recent_blocks = block_cache.lock().unwrap(); | ||
for (height, block) in recent_blocks.iter() { | ||
if *height == block_height { | ||
process_block!(block); | ||
break 'tx_found; | ||
} | ||
} | ||
Comment on lines
+211
to
+217
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Making a note that since we don't move the block to the end of the queue here, we may pop off a block even though it's been used recently - probably not a big deal, but just something that popped up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, I think its fine. Making it LRU is possible, but we'd have to shift the array, which isn't bad but is kinda annoying. Also, I don't think the behavior of "LRFirstUsed" is all that bad, either, really. |
||
} | ||
|
||
let block_hash = source.get_block_hash_by_height(block_height).await | ||
.map_err(|_| UtxoLookupError::UnknownTx)?; | ||
let block_data = source.get_block(&block_hash).await | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could end up fetching the block multiple times here since we're not holding the lock, but probably not a big deal since we're already ok with the bandwidth costs associated with P2P sync and validation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, and we can't hold the lock across an await point. It does suck, I agree, but storing and poll'ing other futures sounded hard 🤷 |
||
.map_err(|_| UtxoLookupError::UnknownTx)?; | ||
let block = match block_data { | ||
BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx), | ||
BlockData::FullBlock(block) => block, | ||
}; | ||
process_block!(block); | ||
{ | ||
let mut recent_blocks = block_cache.lock().unwrap(); | ||
let mut insert = true; | ||
for (height, _) in recent_blocks.iter() { | ||
if *height == block_height { | ||
insert = false; | ||
} | ||
} | ||
if insert { | ||
if recent_blocks.len() >= BLOCK_CACHE_SIZE { | ||
recent_blocks.pop_front(); | ||
} | ||
recent_blocks.push_back((block_height, block)); | ||
} | ||
} | ||
break 'tx_found; | ||
}; | ||
if transaction_index as usize >= block.txdata.len() { | ||
return Err(UtxoLookupError::UnknownTx); | ||
} | ||
let mut transaction = block.txdata.swap_remove(transaction_index as usize); | ||
if output_index as usize >= transaction.output.len() { | ||
return Err(UtxoLookupError::UnknownTx); | ||
} | ||
let outpoint_unspent = | ||
source.is_output_unspent(OutPoint::new(transaction.txid(), output_index.into())).await | ||
.map_err(|_| UtxoLookupError::UnknownTx)?; | ||
source.is_output_unspent(outpoint).await.map_err(|_| UtxoLookupError::UnknownTx)?; | ||
if outpoint_unspent { | ||
Ok(transaction.output.swap_remove(output_index as usize)) | ||
Ok(output) | ||
} else { | ||
Err(UtxoLookupError::UnknownTx) | ||
} | ||
|
@@ -190,9 +234,10 @@ impl<S: FutureSpawner, | |
let fut = res.clone(); | ||
let source = self.source.clone(); | ||
let gossiper = Arc::clone(&self.gossiper); | ||
let block_cache = Arc::clone(&self.block_cache); | ||
let pm = Arc::clone(&self.peer_manager); | ||
self.spawn.spawn(async move { | ||
let res = Self::retrieve_utxo(source, short_channel_id).await; | ||
let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await; | ||
fut.resolve(gossiper.network_graph(), &*gossiper, res); | ||
pm.process_events(); | ||
}); | ||
|
Uh oh!
There was an error while loading. Please reload this page.