Skip to content

Implement the UtxoSource interface for REST/RPC clients #2248

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 25, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 69 additions & 24 deletions lightning-block-sync/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use crate::{AsyncBlockSourceResult, BlockData, BlockSource};

use bitcoin::blockdata::block::Block;
use bitcoin::blockdata::transaction::{TxOut, OutPoint};
use bitcoin::hash_types::BlockHash;

Expand All @@ -17,7 +18,8 @@ use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupErr

use lightning::util::logger::Logger;

use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::collections::VecDeque;
use std::future::Future;
use std::ops::Deref;

Expand All @@ -27,9 +29,6 @@ use std::ops::Deref;
/// Note that while this is implementable for a [`BlockSource`] which returns filtered block data
/// (i.e. [`BlockData::HeaderOnly`] for [`BlockSource::get_block`] requests), such an
/// implementation will reject all gossip as it is not fully able to verify the UTXOs referenced.
///
/// For efficiency, an implementation may consider caching some set of blocks, as many redundant
/// calls may be made.
pub trait UtxoSource : BlockSource + 'static {
/// Fetches the block hash of the block at the given height.
///
Expand Down Expand Up @@ -91,8 +90,11 @@ pub struct GossipVerifier<S: FutureSpawner,
peer_manager: Arc<PeerManager<Descriptor, CM, Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, OM, L, CMH, NS>>,
gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>,
spawn: S,
block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>,
}

const BLOCK_CACHE_SIZE: usize = 5;

impl<S: FutureSpawner,
Blocks: Deref + Send + Sync + Clone,
L: Deref + Send + Sync,
Expand All @@ -114,34 +116,76 @@ impl<S: FutureSpawner,
/// This is expected to be given to a [`P2PGossipSync`] (initially constructed with `None` for
/// the UTXO lookup) via [`P2PGossipSync::add_utxo_lookup`].
pub fn new(source: Blocks, spawn: S, gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, peer_manager: Arc<PeerManager<Descriptor, CM, Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, OM, L, CMH, NS>>) -> Self {
Self { source, spawn, gossiper, peer_manager }
Self {
source, spawn, gossiper, peer_manager,
block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))),
}
}

async fn retrieve_utxo(source: Blocks, short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
async fn retrieve_utxo(
source: Blocks, block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>, short_channel_id: u64
) -> Result<TxOut, UtxoLookupError> {
let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
let output_index = (short_channel_id & 0xffff) as u16;

let block_hash = source.get_block_hash_by_height(block_height).await
.map_err(|_| UtxoLookupError::UnknownTx)?;
let block_data = source.get_block(&block_hash).await
.map_err(|_| UtxoLookupError::UnknownTx)?;
let mut block = match block_data {
BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx),
BlockData::FullBlock(block) => block,
let (outpoint, output);

'tx_found: loop { // Used as a simple goto
macro_rules! process_block {
($block: expr) => { {
if transaction_index as usize >= $block.txdata.len() {
return Err(UtxoLookupError::UnknownTx);
}
let transaction = &$block.txdata[transaction_index as usize];
if output_index as usize >= transaction.output.len() {
return Err(UtxoLookupError::UnknownTx);
}

outpoint = OutPoint::new(transaction.txid(), output_index.into());
output = transaction.output[output_index as usize].clone();
} }
}
{
let recent_blocks = block_cache.lock().unwrap();
for (height, block) in recent_blocks.iter() {
if *height == block_height {
process_block!(block);
break 'tx_found;
}
}
Comment on lines +211 to +217
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making a note that since we don't move the block to the end of the queue here, we may pop off a block even though it's been used recently - probably not a big deal, but just something that popped up

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think its fine. Making it LRU is possible, but we'd have to shift the array, which isn't bad but is kinda annoying. Also, I don't think the behavior of "LRFirstUsed" is all that bad, either, really.

}

let block_hash = source.get_block_hash_by_height(block_height).await
.map_err(|_| UtxoLookupError::UnknownTx)?;
let block_data = source.get_block(&block_hash).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could end up fetching the block multiple times here since we're not holding the lock, but probably not a big deal since we're already ok with the bandwidth costs associated with P2P sync and validation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, and we can't hold the lock across an await point. It does suck, I agree, but storing and poll'ing other futures sounded hard 🤷

.map_err(|_| UtxoLookupError::UnknownTx)?;
let block = match block_data {
BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx),
BlockData::FullBlock(block) => block,
};
process_block!(block);
{
let mut recent_blocks = block_cache.lock().unwrap();
let mut insert = true;
for (height, _) in recent_blocks.iter() {
if *height == block_height {
insert = false;
}
}
if insert {
if recent_blocks.len() >= BLOCK_CACHE_SIZE {
recent_blocks.pop_front();
}
recent_blocks.push_back((block_height, block));
}
}
break 'tx_found;
};
if transaction_index as usize >= block.txdata.len() {
return Err(UtxoLookupError::UnknownTx);
}
let mut transaction = block.txdata.swap_remove(transaction_index as usize);
if output_index as usize >= transaction.output.len() {
return Err(UtxoLookupError::UnknownTx);
}
let outpoint_unspent =
source.is_output_unspent(OutPoint::new(transaction.txid(), output_index.into())).await
.map_err(|_| UtxoLookupError::UnknownTx)?;
source.is_output_unspent(outpoint).await.map_err(|_| UtxoLookupError::UnknownTx)?;
if outpoint_unspent {
Ok(transaction.output.swap_remove(output_index as usize))
Ok(output)
} else {
Err(UtxoLookupError::UnknownTx)
}
Expand Down Expand Up @@ -190,9 +234,10 @@ impl<S: FutureSpawner,
let fut = res.clone();
let source = self.source.clone();
let gossiper = Arc::clone(&self.gossiper);
let block_cache = Arc::clone(&self.block_cache);
let pm = Arc::clone(&self.peer_manager);
self.spawn.spawn(async move {
let res = Self::retrieve_utxo(source, short_channel_id).await;
let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await;
fut.resolve(gossiper.network_graph(), &*gossiper, res);
pm.process_events();
});
Expand Down