Skip to content

Support broadcasting multiple transactions at once #2272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Router for FuzzRouter {

pub struct TestBroadcaster {}
impl BroadcasterInterface for TestBroadcaster {
fn broadcast_transaction(&self, _tx: &Transaction) { }
fn broadcast_transactions(&self, _txs: &[&Transaction]) { }
}

pub struct VecWriter(pub Vec<u8>);
Expand Down
5 changes: 3 additions & 2 deletions fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ struct TestBroadcaster {
txn_broadcasted: Mutex<Vec<Transaction>>,
}
impl BroadcasterInterface for TestBroadcaster {
fn broadcast_transaction(&self, tx: &Transaction) {
self.txn_broadcasted.lock().unwrap().push(tx.clone());
fn broadcast_transactions(&self, txs: &[&Transaction]) {
let owned_txs: Vec<Transaction> = txs.iter().map(|tx| (*tx).clone()).collect();
self.txn_broadcasted.lock().unwrap().extend(owned_txs);
}
}

Expand Down
16 changes: 14 additions & 2 deletions lightning/src/chain/chaininterface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@ use bitcoin::blockdata::transaction::Transaction;

/// An interface to send a transaction to the Bitcoin network.
pub trait BroadcasterInterface {
/// Sends a transaction out to (hopefully) be mined.
fn broadcast_transaction(&self, tx: &Transaction);
/// Sends a list of transactions out to (hopefully) be mined.
/// This only needs to handle the actual broadcasting of transactions, LDK will automatically
/// rebroadcast transactions that haven't made it into a block.
///
/// In some cases LDK may attempt to broadcast a transaction which double-spends another
/// and this isn't a bug and can be safely ignored.
///
/// If more than one transaction is given, these transactions should be considered to be a
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to say its a package if they're not a package? Maybe transactions may be a package? Dunno what the right phrasing is to make clear it (a) may be a package, and (b) if it is a package (but we're not telling you if it is or not) you need to make sure its handled as a package. Dont think we need to update the API for that given Core handles it the same either way, but I wonder if there's any P2P-based clients that would benefit from a flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's why I said these transactions should be considered to be a package. so they assume it is one but doesn't sound like it always is one.

wording is hard here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wording is always hard :(

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, yea, I think this LGTM.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I wonder if there's any P2P-based clients that would benefit from a flag?

Note there is a) Core’s RPC definition of a package (submitpackage), a mining code definition of a package CreateNewBlock and c) the WIP BIP331 defining package at the p2p-level, so which definition the user is expected to respect here ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to use the P2P definition, can we reference to BIP331 directly in the code/documentation (nothing it's not final as of LDK upcoming v0.0.116 release) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could link to that, but that feels more like "the things LDK needs to do to create its packages" than "what the user needs to be aware of to implement this" - rather, we can just be explicit and say "via BIP331 over the P2P protocol or via the XXX Bitcoin Core RPC"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, just thought it'd be nice to provide some more literature if they're not too familiar with the concept and that breaks it down a bit better than diving straight into the BIP.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes with the warning than packages.md might be more loose than p2p packages as defined in BIP331.

/// package and broadcast together. Some of the transactions may or may not depend on each other,
/// be sure to manage both cases correctly.
///
/// Bitcoin transaction packages are defined in BIP 331 and here:
/// https://github.com/bitcoin/bitcoin/blob/master/doc/policy/packages.md
fn broadcast_transactions(&self, txs: &[&Transaction]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker, but we could make this a bit more flexible with the use of Borrow.

diff --git a/lightning/src/chain/chaininterface.rs b/lightning/src/chain/chaininterface.rs
index 8bbcb56e..62ee8b41 100644
--- a/lightning/src/chain/chaininterface.rs
+++ b/lightning/src/chain/chaininterface.rs
@@ -13,7 +13,7 @@
 //! Includes traits for monitoring and receiving notifications of new blocks and block
 //! disconnections, transaction broadcasting, and feerate information requests.
 
-use core::{cmp, ops::Deref};
+use core::{borrow::Borrow, cmp, ops::Deref};
 
 use bitcoin::blockdata::transaction::Transaction;
 
@@ -29,7 +29,7 @@ pub trait BroadcasterInterface {
 	/// If more than one transaction is given, these transactions should be considered to be a
 	/// package and broadcast together. Some of the transactions may or may not depend on each other,
 	/// be sure to manage both cases correctly.
-	fn broadcast_transactions(&self, txs: &[&Transaction]);
+	fn broadcast_transactions<T: Borrow<Transaction>>(&self, txs: &[T]);
 }
 
 /// An enum that represents the speed at which we want a transaction to confirm used for feerate
diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs
index 4b3cc411..4c4b39c7 100644
--- a/lightning/src/util/test_utils.rs
+++ b/lightning/src/util/test_utils.rs
@@ -48,6 +48,7 @@ use regex;
 
 use crate::io;
 use crate::prelude::*;
+use core::borrow::Borrow;
 use core::cell::RefCell;
 use core::time::Duration;
 use crate::sync::{Mutex, Arc};
@@ -188,7 +189,7 @@ pub struct TestChainMonitor<'a> {
 	pub added_monitors: Mutex<Vec<(OutPoint, channelmonitor::ChannelMonitor<EnforcingSigner>)>>,
 	pub monitor_updates: Mutex<HashMap<[u8; 32], Vec<channelmonitor::ChannelMonitorUpdate>>>,
 	pub latest_monitor_update_id: Mutex<HashMap<[u8; 32], (OutPoint, u64, MonitorUpdateId)>>,
-	pub chain_monitor: chainmonitor::ChainMonitor<EnforcingSigner, &'a TestChainSource, &'a chaininterface::BroadcasterInterface, &'a TestFeeEstimator, &'a TestLogger, &'a chainmonitor::Persist<EnforcingSigner>>,
+	pub chain_monitor: chainmonitor::ChainMonitor<EnforcingSigner, &'a TestChainSource, &'a TestBroadcaster, &'a TestFeeEstimator, &'a TestLogger, &'a chainmonitor::Persist<EnforcingSigner>>,
 	pub keys_manager: &'a TestKeysInterface,
 	/// If this is set to Some(), the next update_channel call (not watch_channel) must be a
 	/// ChannelForceClosed event for the given channel_id with should_broadcast set to the given
@@ -196,7 +197,7 @@ pub struct TestChainMonitor<'a> {
 	pub expect_channel_force_closed: Mutex<Option<([u8; 32], bool)>>,
 }
 impl<'a> TestChainMonitor<'a> {
-	pub fn new(chain_source: Option<&'a TestChainSource>, broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, persister: &'a chainmonitor::Persist<EnforcingSigner>, keys_manager: &'a TestKeysInterface) -> Self {
+	pub fn new(chain_source: Option<&'a TestChainSource>, broadcaster: &'a TestBroadcaster, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, persister: &'a chainmonitor::Persist<EnforcingSigner>, keys_manager: &'a TestKeysInterface) -> Self {
 		Self {
 			added_monitors: Mutex::new(Vec::new()),
 			monitor_updates: Mutex::new(HashMap::new()),
@@ -341,8 +342,9 @@ impl TestBroadcaster {
 }
 
 impl chaininterface::BroadcasterInterface for TestBroadcaster {
-	fn broadcast_transactions(&self, txs: &[&Transaction]) {
+	fn broadcast_transactions<T: Borrow<Transaction>>(&self, txs: &[T]) {
 		for tx in txs {
+			let tx = tx.borrow();
 			let lock_time = tx.lock_time.0;
 			assert!(lock_time < 1_500_000_000);
 			if bitcoin::LockTime::from(tx.lock_time).is_block_height() && lock_time > self.blocks.lock().unwrap().last().unwrap().1 {
@@ -353,7 +355,7 @@ impl chaininterface::BroadcasterInterface for TestBroadcaster {
 				}
 			}
 		}
-		let owned_txs: Vec<Transaction> = txs.iter().map(|tx| (*tx).clone()).collect();
+		let owned_txs: Vec<Transaction> = txs.iter().map(|tx| tx.borrow().clone()).collect();
 		self.txn_broadcasted.lock().unwrap().extend(owned_txs);
 	}
 }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for transactions we can just do straight &[Transaction] if its practical, making it generic is a lot of compiler overhead, and we don't need the &[&X] indirection for bindings for transaction specifically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, I don't think we ever return a Vec of transaction references anyway that would force us to clone.

}

/// An enum that represents the speed at which we want a transaction to confirm used for feerate
Expand Down
7 changes: 5 additions & 2 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2327,10 +2327,13 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
where B::Target: BroadcasterInterface,
L::Target: Logger,
{
for tx in self.get_latest_holder_commitment_txn(logger).iter() {
let commit_txs = self.get_latest_holder_commitment_txn(logger);
let mut txs = vec![];
for tx in commit_txs.iter() {
log_info!(logger, "Broadcasting local {}", log_tx!(tx));
broadcaster.broadcast_transaction(tx);
txs.push(tx);
}
broadcaster.broadcast_transactions(&txs);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was the only place I found where we could make use of the change

self.pending_monitor_events.push(MonitorEvent::CommitmentTxConfirmed(self.funding_info.0));
}

Expand Down
8 changes: 4 additions & 4 deletions lightning/src/chain/onchaintx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
OnchainClaim::Tx(tx) => {
let log_start = if bumped_feerate { "Broadcasting RBF-bumped" } else { "Rebroadcasting" };
log_info!(logger, "{} onchain {}", log_start, log_tx!(tx));
broadcaster.broadcast_transaction(&tx);
broadcaster.broadcast_transactions(&[&tx]);
},
#[cfg(anchors)]
OnchainClaim::Event(event) => {
Expand Down Expand Up @@ -767,7 +767,7 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
let package_id = match claim {
OnchainClaim::Tx(tx) => {
log_info!(logger, "Broadcasting onchain {}", log_tx!(tx));
broadcaster.broadcast_transaction(&tx);
broadcaster.broadcast_transactions(&[&tx]);
tx.txid().into_inner()
},
#[cfg(anchors)]
Expand Down Expand Up @@ -960,7 +960,7 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
match bump_claim {
OnchainClaim::Tx(bump_tx) => {
log_info!(logger, "Broadcasting RBF-bumped onchain {}", log_tx!(bump_tx));
broadcaster.broadcast_transaction(&bump_tx);
broadcaster.broadcast_transactions(&[&bump_tx]);
},
#[cfg(anchors)]
OnchainClaim::Event(claim_event) => {
Expand Down Expand Up @@ -1046,7 +1046,7 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
match bump_claim {
OnchainClaim::Tx(bump_tx) => {
log_info!(logger, "Broadcasting onchain {}", log_tx!(bump_tx));
broadcaster.broadcast_transaction(&bump_tx);
broadcaster.broadcast_transactions(&[&bump_tx]);
},
#[cfg(anchors)]
OnchainClaim::Event(claim_event) => {
Expand Down
6 changes: 3 additions & 3 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4480,7 +4480,7 @@ where

if let Some(tx) = funding_broadcastable {
log_info!(self.logger, "Broadcasting funding transaction with txid {}", tx.txid());
self.tx_broadcaster.broadcast_transaction(&tx);
self.tx_broadcaster.broadcast_transactions(&[&tx]);
}

{
Expand Down Expand Up @@ -5012,7 +5012,7 @@ where
};
if let Some(broadcast_tx) = tx {
log_info!(self.logger, "Broadcasting {}", log_tx!(broadcast_tx));
self.tx_broadcaster.broadcast_transaction(&broadcast_tx);
self.tx_broadcaster.broadcast_transactions(&[&broadcast_tx]);
}
if let Some(chan) = chan_option {
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
Expand Down Expand Up @@ -5618,7 +5618,7 @@ where
self.issue_channel_close_events(chan, ClosureReason::CooperativeClosure);

log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
self.tx_broadcaster.broadcast_transaction(&tx);
self.tx_broadcaster.broadcast_transactions(&[&tx]);
update_maps_on_chan_removal!(self, chan);
false
} else { true }
Expand Down
19 changes: 11 additions & 8 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,20 @@ impl TestBroadcaster {
}

impl chaininterface::BroadcasterInterface for TestBroadcaster {
fn broadcast_transaction(&self, tx: &Transaction) {
let lock_time = tx.lock_time.0;
assert!(lock_time < 1_500_000_000);
if bitcoin::LockTime::from(tx.lock_time).is_block_height() && lock_time > self.blocks.lock().unwrap().last().unwrap().1 {
for inp in tx.input.iter() {
if inp.sequence != Sequence::MAX {
panic!("We should never broadcast a transaction before its locktime ({})!", tx.lock_time);
fn broadcast_transactions(&self, txs: &[&Transaction]) {
for tx in txs {
let lock_time = tx.lock_time.0;
assert!(lock_time < 1_500_000_000);
if bitcoin::LockTime::from(tx.lock_time).is_block_height() && lock_time > self.blocks.lock().unwrap().last().unwrap().1 {
for inp in tx.input.iter() {
if inp.sequence != Sequence::MAX {
panic!("We should never broadcast a transaction before its locktime ({})!", tx.lock_time);
}
}
}
}
self.txn_broadcasted.lock().unwrap().push(tx.clone());
let owned_txs: Vec<Transaction> = txs.iter().map(|tx| (*tx).clone()).collect();
self.txn_broadcasted.lock().unwrap().extend(owned_txs);
}
}

Expand Down