Skip to content

Split out BroadcastInterface, ChainWatchInterface monitors re-enter f… #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
Cargo.lock
/target/
**/*.rs.bk
.idea
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ non_bitcoin_chain_hash_routing = []

[dependencies]
bitcoin = "0.11"
bitcoin-chain = { git = "https://github.com/rust-bitcoin/rust-bitcoin-chain", branch = "master" }
rust-crypto = "0.2"
rand = "0.4"
secp256k1 = "0.8"
Expand Down
12 changes: 7 additions & 5 deletions src/chain/bitcoincorerpcchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bitcoin::blockdata::transaction::Transaction;
use bitcoin::blockdata::script::Script;
use bitcoin::util::hash::Sha256dHash;

use chain::chaininterface::{ChainWatchInterface,ChainWatchInterfaceUtil,ChainListener};
use chain::chaininterface::{ChainWatchInterface,ChainWatchInterfaceUtil,ChainListener, BroadcasterInterface};

use std::sync::Weak;

Expand All @@ -23,15 +23,17 @@ impl ChainWatchInterface for BitcoinCoreRPCClientChain {
self.util.watch_all_txn()
}

fn broadcast_transaction(&self, _tx: &Transaction) {
unimplemented!()
}

fn register_listener(&self, listener: Weak<ChainListener>) {
self.util.register_listener(listener)
}
}

impl BroadcasterInterface for BitcoinCoreRPCClientChain {
fn broadcast_transaction(&self, _tx: &Transaction) {
unimplemented!()
}
}

impl BitcoinCoreRPCClientChain {
pub fn new() -> BitcoinCoreRPCClientChain {
BitcoinCoreRPCClientChain {
Expand Down
91 changes: 67 additions & 24 deletions src/chain/chaininterface.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use bitcoin::blockdata::block::BlockHeader;
use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::blockdata::script::Script;
use bitcoin::util::hash::Sha256dHash;

use std::sync::{Weak,Mutex};
use std::sync::{Mutex,Weak,MutexGuard};
use std::sync::atomic::{AtomicUsize, Ordering};

/// An interface to request notification of certain scripts as they appear the
/// chain.
Expand All @@ -21,13 +21,18 @@ pub trait ChainWatchInterface: Sync + Send {
/// Indicates that a listener needs to see all transactions.
fn watch_all_txn(&self);

/// Sends a transaction out to (hopefully) be mined
fn broadcast_transaction(&self, tx: &Transaction);

fn register_listener(&self, listener: Weak<ChainListener>);
//TODO: unregister
}

/// An interface to send a transaction to connected Bitcoin peers.
/// This is for final settlement. An error might indicate that no peers can be reached or
/// that peers rejected the transaction.
pub trait BroadcasterInterface: Sync + Send {
/// Sends a transaction out to (hopefully) be mined
fn broadcast_transaction(&self, tx: &Transaction);
}

/// A trait indicating a desire to listen for events from the chain
pub trait ChainListener: Sync + Send {
/// Notifies a listener that a block was connected.
Expand All @@ -54,67 +59,105 @@ pub enum ConfirmationTarget {
/// called from inside the library in response to ChainListener events, P2P events, or timer
/// events).
pub trait FeeEstimator: Sync + Send {
fn get_est_sat_per_vbyte(&self, ConfirmationTarget) -> u64;
fn get_est_sat_per_vbyte(&self, confirmation_target: ConfirmationTarget) -> u64;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does it matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compiler complained that anonymous arguments are deprecated.

}

/// Utility to capture some common parts of ChainWatchInterface implementors.
/// Keeping a local copy of this in a ChainWatchInterface implementor is likely useful.
pub struct ChainWatchInterfaceUtil {
watched: Mutex<(Vec<Script>, Vec<(Sha256dHash, u32)>, bool)>, //TODO: Something clever to optimize this
listeners: Mutex<Vec<Weak<ChainListener>>>,
reentered: AtomicUsize
}

impl ChainWatchInterfaceUtil {
pub fn new() -> ChainWatchInterfaceUtil {
ChainWatchInterfaceUtil {
watched: Mutex::new((Vec::new(), Vec::new(), false)),
listeners: Mutex::new(Vec::new()),
}
}

pub fn install_watch_script(&self, spk: Script) {
/// Register listener
impl ChainWatchInterface for ChainWatchInterfaceUtil {
fn install_watch_script(&self, script_pub_key: Script) {
let mut watched = self.watched.lock().unwrap();
watched.0.push(Script::from(spk));
watched.0.push(Script::from(script_pub_key));
self.reentered.fetch_add(1, Ordering::Relaxed);
}

pub fn install_watch_outpoint(&self, outpoint: (Sha256dHash, u32)) {
fn install_watch_outpoint(&self, outpoint: (Sha256dHash, u32)) {
let mut watched = self.watched.lock().unwrap();
watched.1.push(outpoint);
self.reentered.fetch_add(1, Ordering::Relaxed);
}

pub fn watch_all_txn(&self) { //TODO: refcnt this?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this TODO was intended to point out that we may want to refcount the watch_all, and have something like an unwatch_all, but then you'd need to have a count of how many times it was called? Anyway, probably worth getting rid of...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is also no unregister. Let see if it is sufficient.

fn watch_all_txn(&self) {
let mut watched = self.watched.lock().unwrap();
watched.2 = true;
self.reentered.fetch_add(1, Ordering::Relaxed);
}

pub fn register_listener(&self, listener: Weak<ChainListener>) {
fn register_listener(&self, listener: Weak<ChainListener>) {
let mut vec = self.listeners.lock().unwrap();
vec.push(listener);
}
}

pub fn do_call_block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
impl ChainWatchInterfaceUtil {
pub fn new() -> ChainWatchInterfaceUtil {
ChainWatchInterfaceUtil {
watched: Mutex::new((Vec::new(), Vec::new(), false)),
listeners: Mutex::new(Vec::new()),
reentered: AtomicUsize::new(1)
}
}

/// notify listener that a block was connected
/// notification will repeat if notified listener register new listeners
pub fn block_connected_with_filtering(&self, block: &Block, height: u32) {
let mut reentered = true;
while reentered {
let mut matched = Vec::new();
let mut matched_index = Vec::new();
{
let watched = self.watched.lock().unwrap();
for (index, transaction) in block.txdata.iter().enumerate() {
if self.does_match_tx_unguarded(transaction, &watched) {
matched.push(transaction);
matched_index.push(index as u32);
}
}
}
reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice());
}
}

/// notify listener that a block was disconnected
pub fn block_disconnected(&self, header: &BlockHeader) {
let listeners = self.listeners.lock().unwrap().clone();
for listener in listeners.iter() {
match listener.upgrade() {
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
Some(arc) => arc.block_disconnected(header),
None => ()
}
}
}

pub fn do_call_block_disconnected(&self, header: &BlockHeader) {
/// call listeners for connected blocks if they are still around.
/// returns true if notified listeners registered additional listener
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
let last_seen = self.reentered.load(Ordering::Relaxed);

let listeners = self.listeners.lock().unwrap().clone();
for listener in listeners.iter() {
match listener.upgrade() {
Some(arc) => arc.block_disconnected(header),
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
None => ()
}
}
return last_seen != self.reentered.load(Ordering::Relaxed);
}

/// Checks if a given transaction matches the current filter
pub fn does_match_tx(&self, tx: &Transaction) -> bool {
let watched = self.watched.lock().unwrap();
self.does_match_tx_unguarded (tx, &watched)
}

fn does_match_tx_unguarded (&self, tx: &Transaction, watched: &MutexGuard<(Vec<Script>, Vec<(Sha256dHash, u32)>, bool)>) -> bool {
if watched.2 {
return true;
}
Expand Down
1 change: 0 additions & 1 deletion src/chain/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod chaininterface;
pub mod bitcoincorerpcchain;
pub mod rustbitcoinchain;
66 changes: 0 additions & 66 deletions src/chain/rustbitcoinchain.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![crate_name = "lightning"]

extern crate bitcoin;
extern crate bitcoin_chain;
extern crate secp256k1;
extern crate rand;
extern crate crypto;
Expand Down
4 changes: 2 additions & 2 deletions src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1392,10 +1392,10 @@ mod tests {
fn confirm_transaction(chain: &test_utils::TestWatchInterface, tx: &Transaction) {
let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
let chan_id = unsafe { CHAN_COUNT };
chain.watch_util.do_call_block_connected(&header, 1, &[tx; 1], &[chan_id as u32; 1]);
chain.watch_util.block_connected_checked(&header, 1, &[tx; 1], &[chan_id as u32; 1]);
for i in 2..100 {
header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
chain.watch_util.do_call_block_connected(&header, i, &[tx; 0], &[0; 0]);
chain.watch_util.block_connected_checked(&header, i, &[tx; 0], &[0; 0]);
}
}

Expand Down
14 changes: 8 additions & 6 deletions src/ln/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use secp256k1::key::{SecretKey,PublicKey};
use ln::msgs::HandleError;
use ln::chan_utils;
use ln::chan_utils::HTLCOutputInCommitment;
use chain::chaininterface::{ChainListener,ChainWatchInterface};
use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface};

use std::collections::HashMap;
use std::sync::{Arc,Mutex};
Expand All @@ -39,24 +39,26 @@ pub trait ManyChannelMonitor: Send + Sync {
pub struct SimpleManyChannelMonitor<Key> {
monitors: Mutex<HashMap<Key, ChannelMonitor>>,
chain_monitor: Arc<ChainWatchInterface>,
broadcaster: Arc<BroadcasterInterface>
}

impl<Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelMonitor<Key> {
fn block_connected(&self, _header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
let monitors = self.monitors.lock().unwrap();
for monitor in monitors.values() {
monitor.block_connected(txn_matched, height, &*self.chain_monitor);
monitor.block_connected(txn_matched, height, &*self.broadcaster);
}
}

fn block_disconnected(&self, _: &BlockHeader) { }
}

impl<Key : Send + cmp::Eq + hash::Hash + 'static> SimpleManyChannelMonitor<Key> {
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> Arc<SimpleManyChannelMonitor<Key>> {
pub fn new(chain_monitor: Arc<ChainWatchInterface>, broadcaster: Arc<BroadcasterInterface>) -> Arc<SimpleManyChannelMonitor<Key>> {
let res = Arc::new(SimpleManyChannelMonitor {
monitors: Mutex::new(HashMap::new()),
chain_monitor: chain_monitor,
chain_monitor,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thats cool...learn a new syntax every day, I guess :p.

broadcaster
});
let weak_res = Arc::downgrade(&res);
res.chain_monitor.register_listener(weak_res);
Expand Down Expand Up @@ -443,7 +445,7 @@ impl ChannelMonitor {
txn_to_broadcast
}

fn block_connected(&self, txn_matched: &[&Transaction], height: u32, chain_monitor: &ChainWatchInterface) {
fn block_connected(&self, txn_matched: &[&Transaction], height: u32, broadcaster: &BroadcasterInterface) {
for tx in txn_matched {
if tx.input.len() != 1 {
// We currently only ever sign something spending a commitment or HTLC
Expand All @@ -454,7 +456,7 @@ impl ChannelMonitor {
for txin in tx.input.iter() {
if self.funding_txo.is_none() || (txin.prev_hash == self.funding_txo.unwrap().0 && txin.prev_index == self.funding_txo.unwrap().1 as u32) {
for tx in self.check_spend_transaction(tx, height).iter() {
chain_monitor.broadcast_transaction(tx);
broadcaster.broadcast_transaction(tx); // TODO: use result
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ impl chaininterface::ChainWatchInterface for TestWatchInterface {
fn watch_all_txn(&self) {
unimplemented!();
}
fn broadcast_transaction(&self, _tx: &Transaction) {
unimplemented!();
}
fn register_listener(&self, listener: Weak<chaininterface::ChainListener>) {
self.watch_util.register_listener(listener);
}
Expand Down