-
Notifications
You must be signed in to change notification settings - Fork 411
Split out BroadcastInterface, ChainWatchInterface monitors re-enter f… #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c58e2d6
8dde2f6
505c1f1
db224f7
abda523
e820c1c
15165f1
3e0c8a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,3 +3,4 @@ | |
Cargo.lock | ||
/target/ | ||
**/*.rs.bk | ||
.idea |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
use bitcoin::blockdata::block::BlockHeader; | ||
use bitcoin::blockdata::block::{Block, BlockHeader}; | ||
use bitcoin::blockdata::transaction::Transaction; | ||
use bitcoin::blockdata::script::Script; | ||
use bitcoin::util::hash::Sha256dHash; | ||
|
||
use std::sync::{Weak,Mutex}; | ||
use std::sync::{Mutex,Weak,MutexGuard}; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
|
||
/// An interface to request notification of certain scripts as they appear the | ||
/// chain. | ||
|
@@ -21,13 +21,18 @@ pub trait ChainWatchInterface: Sync + Send { | |
/// Indicates that a listener needs to see all transactions. | ||
fn watch_all_txn(&self); | ||
|
||
/// Sends a transaction out to (hopefully) be mined | ||
fn broadcast_transaction(&self, tx: &Transaction); | ||
|
||
fn register_listener(&self, listener: Weak<ChainListener>); | ||
//TODO: unregister | ||
} | ||
|
||
/// An interface to send a transaction to connected Bitcoin peers. | ||
/// This is for final settlement. An error might indicate that no peers can be reached or | ||
/// that peers rejected the transaction. | ||
pub trait BroadcasterInterface: Sync + Send { | ||
/// Sends a transaction out to (hopefully) be mined | ||
fn broadcast_transaction(&self, tx: &Transaction); | ||
} | ||
|
||
/// A trait indicating a desire to listen for events from the chain | ||
pub trait ChainListener: Sync + Send { | ||
/// Notifies a listener that a block was connected. | ||
|
@@ -54,67 +59,105 @@ pub enum ConfirmationTarget { | |
/// called from inside the library in response to ChainListener events, P2P events, or timer | ||
/// events). | ||
pub trait FeeEstimator: Sync + Send { | ||
fn get_est_sat_per_vbyte(&self, ConfirmationTarget) -> u64; | ||
fn get_est_sat_per_vbyte(&self, confirmation_target: ConfirmationTarget) -> u64; | ||
} | ||
|
||
/// Utility to capture some common parts of ChainWatchInterface implementors. | ||
/// Keeping a local copy of this in a ChainWatchInterface implementor is likely useful. | ||
pub struct ChainWatchInterfaceUtil { | ||
watched: Mutex<(Vec<Script>, Vec<(Sha256dHash, u32)>, bool)>, //TODO: Something clever to optimize this | ||
listeners: Mutex<Vec<Weak<ChainListener>>>, | ||
reentered: AtomicUsize | ||
} | ||
|
||
impl ChainWatchInterfaceUtil { | ||
pub fn new() -> ChainWatchInterfaceUtil { | ||
ChainWatchInterfaceUtil { | ||
watched: Mutex::new((Vec::new(), Vec::new(), false)), | ||
listeners: Mutex::new(Vec::new()), | ||
} | ||
} | ||
|
||
pub fn install_watch_script(&self, spk: Script) { | ||
/// Register listener | ||
impl ChainWatchInterface for ChainWatchInterfaceUtil { | ||
fn install_watch_script(&self, script_pub_key: Script) { | ||
let mut watched = self.watched.lock().unwrap(); | ||
watched.0.push(Script::from(spk)); | ||
watched.0.push(Script::from(script_pub_key)); | ||
self.reentered.fetch_add(1, Ordering::Relaxed); | ||
} | ||
|
||
pub fn install_watch_outpoint(&self, outpoint: (Sha256dHash, u32)) { | ||
fn install_watch_outpoint(&self, outpoint: (Sha256dHash, u32)) { | ||
let mut watched = self.watched.lock().unwrap(); | ||
watched.1.push(outpoint); | ||
self.reentered.fetch_add(1, Ordering::Relaxed); | ||
} | ||
|
||
pub fn watch_all_txn(&self) { //TODO: refcnt this? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this TODO was intended to point out that we may want to refcount the watch_all, and have something like an unwatch_all, but then you'd need to have a count of how many times it was called? Anyway, probably worth getting rid of... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is also no unregister. Let see if it is sufficient. |
||
fn watch_all_txn(&self) { | ||
let mut watched = self.watched.lock().unwrap(); | ||
watched.2 = true; | ||
self.reentered.fetch_add(1, Ordering::Relaxed); | ||
} | ||
|
||
pub fn register_listener(&self, listener: Weak<ChainListener>) { | ||
fn register_listener(&self, listener: Weak<ChainListener>) { | ||
let mut vec = self.listeners.lock().unwrap(); | ||
vec.push(listener); | ||
} | ||
} | ||
|
||
pub fn do_call_block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) { | ||
impl ChainWatchInterfaceUtil { | ||
pub fn new() -> ChainWatchInterfaceUtil { | ||
ChainWatchInterfaceUtil { | ||
watched: Mutex::new((Vec::new(), Vec::new(), false)), | ||
listeners: Mutex::new(Vec::new()), | ||
reentered: AtomicUsize::new(1) | ||
} | ||
} | ||
|
||
/// notify listener that a block was connected | ||
/// notification will repeat if notified listener register new listeners | ||
pub fn block_connected_with_filtering(&self, block: &Block, height: u32) { | ||
let mut reentered = true; | ||
while reentered { | ||
let mut matched = Vec::new(); | ||
let mut matched_index = Vec::new(); | ||
{ | ||
let watched = self.watched.lock().unwrap(); | ||
for (index, transaction) in block.txdata.iter().enumerate() { | ||
if self.does_match_tx_unguarded(transaction, &watched) { | ||
matched.push(transaction); | ||
matched_index.push(index as u32); | ||
} | ||
} | ||
} | ||
reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice()); | ||
} | ||
} | ||
|
||
/// notify listener that a block was disconnected | ||
pub fn block_disconnected(&self, header: &BlockHeader) { | ||
let listeners = self.listeners.lock().unwrap().clone(); | ||
for listener in listeners.iter() { | ||
match listener.upgrade() { | ||
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched), | ||
Some(arc) => arc.block_disconnected(header), | ||
None => () | ||
} | ||
} | ||
} | ||
|
||
pub fn do_call_block_disconnected(&self, header: &BlockHeader) { | ||
/// call listeners for connected blocks if they are still around. | ||
/// returns true if notified listeners registered additional listener | ||
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool { | ||
let last_seen = self.reentered.load(Ordering::Relaxed); | ||
|
||
let listeners = self.listeners.lock().unwrap().clone(); | ||
for listener in listeners.iter() { | ||
match listener.upgrade() { | ||
Some(arc) => arc.block_disconnected(header), | ||
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched), | ||
None => () | ||
} | ||
} | ||
return last_seen != self.reentered.load(Ordering::Relaxed); | ||
} | ||
|
||
/// Checks if a given transaction matches the current filter | ||
pub fn does_match_tx(&self, tx: &Transaction) -> bool { | ||
let watched = self.watched.lock().unwrap(); | ||
self.does_match_tx_unguarded (tx, &watched) | ||
} | ||
|
||
fn does_match_tx_unguarded (&self, tx: &Transaction, watched: &MutexGuard<(Vec<Script>, Vec<(Sha256dHash, u32)>, bool)>) -> bool { | ||
if watched.2 { | ||
return true; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,2 @@ | ||
pub mod chaininterface; | ||
pub mod bitcoincorerpcchain; | ||
pub mod rustbitcoinchain; |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ use secp256k1::key::{SecretKey,PublicKey}; | |
use ln::msgs::HandleError; | ||
use ln::chan_utils; | ||
use ln::chan_utils::HTLCOutputInCommitment; | ||
use chain::chaininterface::{ChainListener,ChainWatchInterface}; | ||
use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface}; | ||
|
||
use std::collections::HashMap; | ||
use std::sync::{Arc,Mutex}; | ||
|
@@ -39,24 +39,26 @@ pub trait ManyChannelMonitor: Send + Sync { | |
pub struct SimpleManyChannelMonitor<Key> { | ||
monitors: Mutex<HashMap<Key, ChannelMonitor>>, | ||
chain_monitor: Arc<ChainWatchInterface>, | ||
broadcaster: Arc<BroadcasterInterface> | ||
} | ||
|
||
impl<Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelMonitor<Key> { | ||
fn block_connected(&self, _header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) { | ||
let monitors = self.monitors.lock().unwrap(); | ||
for monitor in monitors.values() { | ||
monitor.block_connected(txn_matched, height, &*self.chain_monitor); | ||
monitor.block_connected(txn_matched, height, &*self.broadcaster); | ||
} | ||
} | ||
|
||
fn block_disconnected(&self, _: &BlockHeader) { } | ||
} | ||
|
||
impl<Key : Send + cmp::Eq + hash::Hash + 'static> SimpleManyChannelMonitor<Key> { | ||
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> Arc<SimpleManyChannelMonitor<Key>> { | ||
pub fn new(chain_monitor: Arc<ChainWatchInterface>, broadcaster: Arc<BroadcasterInterface>) -> Arc<SimpleManyChannelMonitor<Key>> { | ||
let res = Arc::new(SimpleManyChannelMonitor { | ||
monitors: Mutex::new(HashMap::new()), | ||
chain_monitor: chain_monitor, | ||
chain_monitor, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, thats cool...learn a new syntax every day, I guess :p. |
||
broadcaster | ||
}); | ||
let weak_res = Arc::downgrade(&res); | ||
res.chain_monitor.register_listener(weak_res); | ||
|
@@ -443,7 +445,7 @@ impl ChannelMonitor { | |
txn_to_broadcast | ||
} | ||
|
||
fn block_connected(&self, txn_matched: &[&Transaction], height: u32, chain_monitor: &ChainWatchInterface) { | ||
fn block_connected(&self, txn_matched: &[&Transaction], height: u32, broadcaster: &BroadcasterInterface) { | ||
for tx in txn_matched { | ||
if tx.input.len() != 1 { | ||
// We currently only ever sign something spending a commitment or HTLC | ||
|
@@ -454,7 +456,7 @@ impl ChannelMonitor { | |
for txin in tx.input.iter() { | ||
if self.funding_txo.is_none() || (txin.prev_hash == self.funding_txo.unwrap().0 && txin.prev_index == self.funding_txo.unwrap().1 as u32) { | ||
for tx in self.check_spend_transaction(tx, height).iter() { | ||
chain_monitor.broadcast_transaction(tx); | ||
broadcaster.broadcast_transaction(tx); // TODO: use result | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Does it matter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compiler complained that anonymous arguments are deprecated.