Skip to content

Split out BroadcastInterface, ChainWatchInterface monitors re-enter f… #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ non_bitcoin_chain_hash_routing = []

[dependencies]
bitcoin = "0.11"
bitcoin-chain = { git = "https://github.com/rust-bitcoin/rust-bitcoin-chain", branch = "master" }
rust-crypto = "0.2"
rand = "0.4"
secp256k1 = "0.8"
Expand Down
22 changes: 17 additions & 5 deletions src/chain/bitcoincorerpcchain.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::error::Error;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import.

use bitcoin::blockdata::transaction::Transaction;
use bitcoin::blockdata::script::Script;
use bitcoin::blockdata::block::{Block, BlockHeader};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import.

use bitcoin::util::hash::Sha256dHash;

use chain::chaininterface::{ChainWatchInterface,ChainWatchInterfaceUtil,ChainListener};
use chain::chaininterface::{ChainWatchInterface,ChainWatchInterfaceUtil,ChainListener, BroadcasterInterface};

use std::sync::Weak;

Expand All @@ -23,19 +25,29 @@ impl ChainWatchInterface for BitcoinCoreRPCClientChain {
self.util.watch_all_txn()
}

fn broadcast_transaction(&self, _tx: &Transaction) {
unimplemented!()
}

fn register_listener(&self, listener: Weak<ChainListener>) {
self.util.register_listener(listener)
}
}

impl BroadcasterInterface for BitcoinCoreRPCClientChain {
fn broadcast_transaction(&self, tx: &Transaction) -> Result<(), Box<Error>> {
unimplemented!()
}
}

impl BitcoinCoreRPCClientChain {
pub fn new() -> BitcoinCoreRPCClientChain {
BitcoinCoreRPCClientChain {
util: ChainWatchInterfaceUtil::new(),
}
}

pub fn block_connected(&self, block: &Block, height: u32) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda envisioned this would have some polling thread (or a function to hit to make it poll) pinging the RPC and then generate the events there, not expose a pub fn which lets users call block_connected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of SPV this is not polling, but an incoming headers message processed and will call this.

self.util.block_connected(block, height)
}

pub fn block_disconnected(&self, header: &BlockHeader) {
self.util.block_disconnected(header)
}
}
86 changes: 64 additions & 22 deletions src/chain/chaininterface.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use bitcoin::blockdata::block::BlockHeader;
use std::error::Error;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import.

use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::blockdata::script::Script;
use bitcoin::util::hash::Sha256dHash;

use std::sync::{Weak,Mutex};
use std::sync::{Mutex,Weak};
use std::sync::atomic::{AtomicUsize, Ordering};

/// An interface to request notification of certain scripts as they appear the
/// chain.
Expand All @@ -21,13 +22,18 @@ pub trait ChainWatchInterface: Sync + Send {
/// Indicates that a listener needs to see all transactions.
fn watch_all_txn(&self);

/// Sends a transaction out to (hopefully) be mined
fn broadcast_transaction(&self, tx: &Transaction);

fn register_listener(&self, listener: Weak<ChainListener>);
//TODO: unregister
}

/// An interface to send a transaction to connected Bitcoin peers.
/// This is for final settlement. An error might indicate that no peers can be reached or
/// that peers rejected the transaction.
pub trait BroadcasterInterface: Sync + Send {
/// Sends a transaction out to (hopefully) be mined
fn broadcast_transaction(&self, tx: &Transaction) -> Result<(), Box<Error>>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if its useful to return a Result here - Bitcoin transaction broadcasting is always somewhat a "fire and see if anything happens" process. In other words, there is always a high chance of undetectable failure, so trying to return failure is a bit useless, or to put it another way, is there any useful handling you can do except to try again later (which should happen anyway?). Maybe I'm just being idealistic about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are immediate error conditions that would better be communicated back, such as no connection to any peer or all peers rejected the message. An Ok result surely does not mean any confirmation, but without it does not even make sense to wait if anything happens as it won't.

}

/// A trait indicating a desire to listen for events from the chain
pub trait ChainListener: Sync + Send {
/// Notifies a listener that a block was connected.
Expand All @@ -54,44 +60,79 @@ pub enum ConfirmationTarget {
/// called from inside the library in response to ChainListener events, P2P events, or timer
/// events).
pub trait FeeEstimator: Sync + Send {
fn get_est_sat_per_vbyte(&self, ConfirmationTarget) -> u64;
fn get_est_sat_per_vbyte(&self, confirmation_target: ConfirmationTarget) -> u64;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does it matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compiler complained that anonymous arguments are deprecated.

}

/// Utility to capture some common parts of ChainWatchInterface implementors.
/// Keeping a local copy of this in a ChainWatchInterface implementor is likely useful.
pub struct ChainWatchInterfaceUtil {
watched: Mutex<(Vec<Script>, Vec<(Sha256dHash, u32)>, bool)>, //TODO: Something clever to optimize this
listeners: Mutex<Vec<Weak<ChainListener>>>,
reentered: AtomicUsize
}

impl ChainWatchInterfaceUtil {
pub fn new() -> ChainWatchInterfaceUtil {
ChainWatchInterfaceUtil {
watched: Mutex::new((Vec::new(), Vec::new(), false)),
listeners: Mutex::new(Vec::new()),
}
}

pub fn install_watch_script(&self, spk: Script) {
/// Register listener
impl ChainWatchInterface for ChainWatchInterfaceUtil {
fn install_watch_script(&self, script_pub_key: Script) {
let mut watched = self.watched.lock().unwrap();
watched.0.push(Script::from(spk));
watched.0.push(Script::from(script_pub_key));
self.reentered.fetch_add(1, Ordering::Relaxed);
}

pub fn install_watch_outpoint(&self, outpoint: (Sha256dHash, u32)) {
fn install_watch_outpoint(&self, outpoint: (Sha256dHash, u32)) {
let mut watched = self.watched.lock().unwrap();
watched.1.push(outpoint);
self.reentered.fetch_add(1, Ordering::Relaxed);
}

pub fn watch_all_txn(&self) { //TODO: refcnt this?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this TODO was intended to point out that we may want to refcount the watch_all, and have something like an unwatch_all, but then you'd need to have a count of how many times it was called? Anyway, probably worth getting rid of...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is also no unregister. Let see if it is sufficient.

fn watch_all_txn(&self) {
let mut watched = self.watched.lock().unwrap();
watched.2 = true;
self.reentered.fetch_add(1, Ordering::Relaxed);
}

pub fn register_listener(&self, listener: Weak<ChainListener>) {
fn register_listener(&self, listener: Weak<ChainListener>) {
let mut vec = self.listeners.lock().unwrap();
vec.push(listener);
}
}

impl ChainWatchInterfaceUtil {
pub fn new() -> ChainWatchInterfaceUtil {
ChainWatchInterfaceUtil {
watched: Mutex::new((Vec::new(), Vec::new(), false)),
listeners: Mutex::new(Vec::new()),
reentered: AtomicUsize::new(1)
}
}

/// notify listener that a block was connected
/// notification will repeat if notified listener register new listeners
pub fn block_connected(&self, block: &Block, height: u32) {
let mut watch = self.reentered.load(Ordering::Relaxed);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is relaxed actually OK here? If you have multiple threads calling fetch_add(1) are you allowed to see reentered go backwards? I assume not, but I dont think the performance hit of something stricter matters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enters are not strictly counted, the requirement is only to notice that there was a re-entry.

let mut last_seen = 0;
// re-scan if new watch added during previous scan
while last_seen != watch {
let mut matched = Vec::new();
let mut matched_index = Vec::new();
for (index, transaction) in block.txdata.iter().enumerate() {
if self.does_match_tx(transaction) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, you're taking a lock inside of the tight loop here, maybe worth pulling the code into this function and out of the helper so that you dont do so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it would make sense to pull in the helper code. I did not here to make review easier not changing helper if not strictly needed.

matched.push(transaction);
matched_index.push(index as u32);
}
}
last_seen = watch;
self.do_call_block_connected(&block.header, height, matched.as_slice(), matched_index.as_slice());
watch = self.reentered.load(Ordering::Relaxed);
}
}

/// notify listener that a block was disconnected
pub fn block_disconnected(&self, header: &BlockHeader) {
self.do_call_block_disconnected(header);
}

/// call listeners for connected blocks if they are still around
pub fn do_call_block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
let listeners = self.listeners.lock().unwrap().clone();
for listener in listeners.iter() {
Expand All @@ -102,7 +143,8 @@ impl ChainWatchInterfaceUtil {
}
}

pub fn do_call_block_disconnected(&self, header: &BlockHeader) {
/// call listeners for disconnected blocks if they are still around
fn do_call_block_disconnected(&self, header: &BlockHeader) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this non-pub?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block_connected and block_disconnected meant to be called not these. I would probably inline them into those if I were not trying to make review easier.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just delete this function and move it into block_disconnected instead of having the stub?

let listeners = self.listeners.lock().unwrap().clone();
for listener in listeners.iter() {
match listener.upgrade() {
Expand All @@ -113,7 +155,7 @@ impl ChainWatchInterfaceUtil {
}

/// Checks if a given transaction matches the current filter
pub fn does_match_tx(&self, tx: &Transaction) -> bool {
fn does_match_tx(&self, tx: &Transaction) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why skip this from being pub? If people use some compression-like block downloads (BIP 37, the neutrino stuff, some PIR scheme, whatever) then its probably still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are right. I make it pub again

let watched = self.watched.lock().unwrap();
if watched.2 {
return true;
Expand Down
1 change: 0 additions & 1 deletion src/chain/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod chaininterface;
pub mod bitcoincorerpcchain;
pub mod rustbitcoinchain;
66 changes: 0 additions & 66 deletions src/chain/rustbitcoinchain.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![crate_name = "lightning"]

extern crate bitcoin;
extern crate bitcoin_chain;
extern crate secp256k1;
extern crate rand;
extern crate crypto;
Expand Down
14 changes: 8 additions & 6 deletions src/ln/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use secp256k1::key::{SecretKey,PublicKey};
use ln::msgs::HandleError;
use ln::chan_utils;
use ln::chan_utils::HTLCOutputInCommitment;
use chain::chaininterface::{ChainListener,ChainWatchInterface};
use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface};

use std::collections::HashMap;
use std::sync::{Arc,Mutex};
Expand All @@ -39,24 +39,26 @@ pub trait ManyChannelMonitor: Send + Sync {
pub struct SimpleManyChannelMonitor<Key> {
monitors: Mutex<HashMap<Key, ChannelMonitor>>,
chain_monitor: Arc<ChainWatchInterface>,
broadcaster: Arc<BroadcasterInterface>
}

impl<Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelMonitor<Key> {
fn block_connected(&self, _header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
let monitors = self.monitors.lock().unwrap();
for monitor in monitors.values() {
monitor.block_connected(txn_matched, height, &*self.chain_monitor);
monitor.block_connected(txn_matched, height, &*self.broadcaster);
}
}

fn block_disconnected(&self, _: &BlockHeader) { }
}

impl<Key : Send + cmp::Eq + hash::Hash + 'static> SimpleManyChannelMonitor<Key> {
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> Arc<SimpleManyChannelMonitor<Key>> {
pub fn new(chain_monitor: Arc<ChainWatchInterface>, broadcaster: Arc<BroadcasterInterface>) -> Arc<SimpleManyChannelMonitor<Key>> {
let res = Arc::new(SimpleManyChannelMonitor {
monitors: Mutex::new(HashMap::new()),
chain_monitor: chain_monitor,
chain_monitor,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thats cool...learn a new syntax every day, I guess :p.

broadcaster
});
let weak_res = Arc::downgrade(&res);
res.chain_monitor.register_listener(weak_res);
Expand Down Expand Up @@ -443,7 +445,7 @@ impl ChannelMonitor {
txn_to_broadcast
}

fn block_connected(&self, txn_matched: &[&Transaction], height: u32, chain_monitor: &ChainWatchInterface) {
fn block_connected(&self, txn_matched: &[&Transaction], height: u32, broadcaster: &BroadcasterInterface) {
for tx in txn_matched {
if tx.input.len() != 1 {
// We currently only ever sign something spending a commitment or HTLC
Expand All @@ -454,7 +456,7 @@ impl ChannelMonitor {
for txin in tx.input.iter() {
if self.funding_txo.is_none() || (txin.prev_hash == self.funding_txo.unwrap().0 && txin.prev_index == self.funding_txo.unwrap().1 as u32) {
for tx in self.check_spend_transaction(tx, height).iter() {
chain_monitor.broadcast_transaction(tx);
broadcaster.broadcast_transaction(tx); // TODO: use result
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ impl chaininterface::ChainWatchInterface for TestWatchInterface {
fn watch_all_txn(&self) {
unimplemented!();
}
fn broadcast_transaction(&self, _tx: &Transaction) {
unimplemented!();
}
fn register_listener(&self, listener: Weak<chaininterface::ChainListener>) {
self.watch_util.register_listener(listener);
}
Expand Down