Skip to content

Commit f3a8a1c

Browse files
Add monitor_did_broadcast method to ManyChannelMonitor
This allows us to replace the call to channel.channel_monitor() .would_broadcast_at_height() in ChannelManager. This allows us to next get rid of the Channel's copy of the ChannelMonitor.
1 parent 8ac0992 commit f3a8a1c

File tree

6 files changed

+97
-25
lines changed

6 files changed

+97
-25
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ impl channelmonitor::ManyChannelMonitor for TestChannelMonitor {
138138
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
139139
return self.simple_monitor.get_and_clear_pending_htlcs_updated();
140140
}
141+
142+
fn get_monitor_did_broadcast(&self, funding_txo: OutPoint) -> Result<bool, channelmonitor::ChannelMonitorUpdateErr> {
143+
self.simple_monitor.get_monitor_did_broadcast(funding_txo)
144+
}
141145
}
142146

143147
struct KeyProvider {

lightning/src/ln/channelmanager.rs

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2992,9 +2992,39 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
29922992
}
29932993
}
29942994

2995+
let mut failed_channels = Vec::new();
2996+
let mut channel_lock = self.channel_state.lock().unwrap();
2997+
let channel_state = &mut *channel_lock;
2998+
let pending_msg_events = &mut channel_state.pending_msg_events;
2999+
let short_to_id = &mut channel_state.short_to_id;
3000+
channel_state.by_id.retain(|_, chan| {
3001+
if let Some(funding_tx) = chan.get_funding_txo() {
3002+
match self.monitor.get_monitor_did_broadcast(funding_tx) {
3003+
Ok(broadcasted) => {
3004+
if broadcasted {
3005+
if let Some(short_id) = chan.get_short_channel_id() {
3006+
short_to_id.remove(&short_id);
3007+
}
3008+
failed_channels.push(chan.force_shutdown(false));
3009+
if let Ok(update) = self.get_channel_update(&chan) {
3010+
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
3011+
msg: update
3012+
});
3013+
}
3014+
return false
3015+
}
3016+
},
3017+
_ => {},
3018+
}
3019+
}
3020+
true
3021+
});
3022+
for failure in failed_channels.drain(..) {
3023+
self.finish_force_close_channel(failure);
3024+
}
3025+
29953026
let mut ret = Vec::new();
2996-
let mut channel_state = self.channel_state.lock().unwrap();
2997-
mem::swap(&mut ret, &mut channel_state.pending_msg_events);
3027+
mem::swap(&mut ret, pending_msg_events);
29983028
ret
29993029
}
30003030
}
@@ -3104,21 +3134,6 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
31043134
}
31053135
}
31063136
}
3107-
if channel.is_funding_initiated() && channel.channel_monitor().would_broadcast_at_height(height, &self.logger) {
3108-
if let Some(short_id) = channel.get_short_channel_id() {
3109-
short_to_id.remove(&short_id);
3110-
}
3111-
// If would_broadcast_at_height() is true, the channel_monitor will broadcast
3112-
// the latest local tx for us, so we should skip that here (it doesn't really
3113-
// hurt anything, but does make tests a bit simpler).
3114-
failed_channels.push(channel.force_shutdown(false));
3115-
if let Ok(update) = self.get_channel_update(&channel) {
3116-
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
3117-
msg: update
3118-
});
3119-
}
3120-
return false;
3121-
}
31223137
true
31233138
});
31243139

lightning/src/ln/channelmonitor.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,13 @@ impl<ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send, L:
299299
}
300300
pending_htlcs_updated
301301
}
302+
303+
fn get_monitor_did_broadcast(&self, funding_txo: OutPoint) -> Result<bool, ChannelMonitorUpdateErr> {
304+
match self.monitors.lock().unwrap().get(&funding_txo) {
305+
Some(monitor) => Ok(monitor.broadcasted()),
306+
None => Err(ChannelMonitorUpdateErr::PermanentFailure)
307+
}
308+
}
302309
}
303310

304311
impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref, C: Deref> events::EventsProvider for SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C>
@@ -813,6 +820,9 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
813820
// may occur, and we fail any such monitor updates.
814821
local_tx_signed: bool,
815822

823+
// Whether the commitment transaction associated with this channel has been broadcasted.
824+
broadcasted: bool,
825+
816826
// We simply modify last_block_hash in Channel's block_connected so that serialization is
817827
// consistent but hopefully the users' copy handles block_connected in a consistent way.
818828
// (we do *not*, however, update them in update_monitor to ensure any local user copies keep
@@ -884,6 +894,9 @@ pub trait ManyChannelMonitor: Send + Sync {
884894
/// ChannelMonitor::get_and_clear_pending_htlcs_updated() for each ChannelMonitor and return
885895
/// the full list.
886896
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate>;
897+
898+
/// Gets whether the monitor broadcasted the channel's commitment transaction.
899+
fn get_monitor_did_broadcast(&self, funding_txo: OutPoint) -> Result<bool, ChannelMonitorUpdateErr>;
887900
}
888901

889902
#[cfg(any(test, feature = "fuzztarget"))]
@@ -1113,6 +1126,7 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
11131126

11141127
self.lockdown_from_offchain.write(writer)?;
11151128
self.local_tx_signed.write(writer)?;
1129+
self.broadcasted.write(writer)?;
11161130

11171131
Ok(())
11181132
}
@@ -1197,6 +1211,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
11971211

11981212
lockdown_from_offchain: false,
11991213
local_tx_signed: false,
1214+
broadcasted: false,
12001215

12011216
last_block_hash: Default::default(),
12021217
secp_ctx: Secp256k1::new(),
@@ -1350,6 +1365,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
13501365
{
13511366
for tx in self.get_latest_local_commitment_txn(logger).iter() {
13521367
broadcaster.broadcast_transaction(tx);
1368+
self.broadcasted = true;
13531369
}
13541370
}
13551371

@@ -1966,7 +1982,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
19661982
}
19671983
}
19681984
}
1969-
self.onchain_tx_handler.block_connected(txn_matched, claimable_outpoints, height, &*broadcaster, &*fee_estimator, &*logger);
1985+
self.broadcasted = self.onchain_tx_handler.block_connected(txn_matched, claimable_outpoints, height, &*broadcaster, &*fee_estimator, &*logger);
19701986

19711987
self.last_block_hash = block_hash.clone();
19721988
for &(ref txid, ref output_scripts) in watch_outputs.iter() {
@@ -1988,11 +2004,16 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
19882004
//- maturing spendable output has transaction paying us has been disconnected
19892005
}
19902006

1991-
self.onchain_tx_handler.block_disconnected(height, broadcaster, fee_estimator, logger);
2007+
self.broadcasted = self.onchain_tx_handler.block_disconnected(height, broadcaster, fee_estimator, logger);
19922008

19932009
self.last_block_hash = block_hash.clone();
19942010
}
19952011

2012+
/// Whether our local commitment transaction has been broadcasted.
2013+
fn broadcasted(&self) -> bool {
2014+
self.broadcasted
2015+
}
2016+
19962017
pub(super) fn would_broadcast_at_height<L: Deref>(&self, height: u32, logger: &L) -> bool where L::Target: Logger {
19972018
// We need to consider all HTLCs which are:
19982019
// * in any unrevoked remote commitment transaction, as they could broadcast said
@@ -2483,6 +2504,7 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for (BlockHash, ChannelMonitor
24832504

24842505
let lockdown_from_offchain = Readable::read(reader)?;
24852506
let local_tx_signed = Readable::read(reader)?;
2507+
let broadcasted = Readable::read(reader)?;
24862508

24872509
Ok((last_block_hash.clone(), ChannelMonitor {
24882510
latest_update_id,
@@ -2527,6 +2549,8 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for (BlockHash, ChannelMonitor
25272549
lockdown_from_offchain,
25282550
local_tx_signed,
25292551

2552+
broadcasted,
2553+
25302554
last_block_hash,
25312555
secp_ctx: Secp256k1::new(),
25322556
}))

lightning/src/ln/functional_tests.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2420,13 +2420,21 @@ fn channel_monitor_network_test() {
24202420
// CLTV expires at TEST_FINAL_CLTV + 1 (current height) + 1 (added in send_payment for
24212421
// buffer space).
24222422

2423-
{
2423+
let (close_chan_update_1, close_chan_update_2) = {
24242424
let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
24252425
nodes[3].block_notifier.block_connected_checked(&header, 2, &Vec::new()[..], &[0; 0]);
24262426
for i in 3..TEST_FINAL_CLTV + 2 + LATENCY_GRACE_PERIOD_BLOCKS + 1 {
24272427
header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
24282428
nodes[3].block_notifier.block_connected_checked(&header, i, &Vec::new()[..], &[0; 0]);
24292429
}
2430+
let events = nodes[3].node.get_and_clear_pending_msg_events();
2431+
assert_eq!(events.len(), 1);
2432+
let close_chan_update_1 = match events[0] {
2433+
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
2434+
msg.clone()
2435+
},
2436+
_ => panic!("Unexpected event"),
2437+
};
24302438
check_added_monitors!(nodes[3], 1);
24312439

24322440
// Clear bumped claiming txn spending node 2 commitment tx. Bumped txn are generated after reaching some height timer.
@@ -2451,16 +2459,25 @@ fn channel_monitor_network_test() {
24512459
header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
24522460
nodes[4].block_notifier.block_connected_checked(&header, i, &Vec::new()[..], &[0; 0]);
24532461
}
2454-
2462+
let events = nodes[4].node.get_and_clear_pending_msg_events();
2463+
assert_eq!(events.len(), 1);
2464+
let close_chan_update_2 = match events[0] {
2465+
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
2466+
msg.clone()
2467+
},
2468+
_ => panic!("Unexpected event"),
2469+
};
24552470
check_added_monitors!(nodes[4], 1);
24562471
test_txn_broadcast(&nodes[4], &chan_4, None, HTLCType::SUCCESS);
24572472

24582473
header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
24592474
nodes[4].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[0].clone()] }, TEST_FINAL_CLTV - 5);
24602475

24612476
check_preimage_claim(&nodes[4], &node_txn);
2462-
}
2463-
get_announce_close_broadcast_events(&nodes, 3, 4);
2477+
(close_chan_update_1, close_chan_update_2)
2478+
};
2479+
nodes[3].net_graph_msg_handler.handle_channel_update(&close_chan_update_2).unwrap();
2480+
nodes[4].net_graph_msg_handler.handle_channel_update(&close_chan_update_1).unwrap();
24642481
assert_eq!(nodes[3].node.list_channels().len(), 0);
24652482
assert_eq!(nodes[4].node.list_channels().len(), 0);
24662483
}

lightning/src/ln/onchaintx.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,7 @@ impl<ChanSigner: ChannelKeys> OnchainTxHandler<ChanSigner> {
657657
None
658658
}
659659

660-
pub(super) fn block_connected<B: Deref, F: Deref, L: Deref>(&mut self, txn_matched: &[&Transaction], claimable_outpoints: Vec<ClaimRequest>, height: u32, broadcaster: B, fee_estimator: F, logger: L)
660+
pub(super) fn block_connected<B: Deref, F: Deref, L: Deref>(&mut self, txn_matched: &[&Transaction], claimable_outpoints: Vec<ClaimRequest>, height: u32, broadcaster: B, fee_estimator: F, logger: L) -> bool
661661
where B::Target: BroadcasterInterface,
662662
F::Target: FeeEstimator,
663663
L::Target: Logger,
@@ -666,6 +666,7 @@ impl<ChanSigner: ChannelKeys> OnchainTxHandler<ChanSigner> {
666666
let mut new_claims = Vec::new();
667667
let mut aggregated_claim = HashMap::new();
668668
let mut aggregated_soonest = ::std::u32::MAX;
669+
let mut broadcasted_tx = false;
669670

670671
// Try to aggregate outputs if their timelock expiration isn't imminent (absolute_timelock
671672
// <= CLTV_SHARED_CLAIM_BUFFER) and they don't require an immediate nLockTime (aggregable).
@@ -702,6 +703,7 @@ impl<ChanSigner: ChannelKeys> OnchainTxHandler<ChanSigner> {
702703
self.pending_claim_requests.insert(txid, claim_material);
703704
log_trace!(logger, "Broadcast onchain {}", log_tx!(tx));
704705
broadcaster.broadcast_transaction(&tx);
706+
broadcasted_tx = true;
705707
}
706708
}
707709

@@ -827,13 +829,16 @@ impl<ChanSigner: ChannelKeys> OnchainTxHandler<ChanSigner> {
827829
}
828830
}
829831
}
832+
833+
broadcasted_tx
830834
}
831835

832-
pub(super) fn block_disconnected<B: Deref, F: Deref, L: Deref>(&mut self, height: u32, broadcaster: B, fee_estimator: F, logger: L)
836+
pub(super) fn block_disconnected<B: Deref, F: Deref, L: Deref>(&mut self, height: u32, broadcaster: B, fee_estimator: F, logger: L) -> bool
833837
where B::Target: BroadcasterInterface,
834838
F::Target: FeeEstimator,
835839
L::Target: Logger,
836840
{
841+
let mut broadcasted_tx = false;
837842
let mut bump_candidates = HashMap::new();
838843
if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) {
839844
//- our claim tx on a commitment tx output
@@ -859,6 +864,7 @@ impl<ChanSigner: ChannelKeys> OnchainTxHandler<ChanSigner> {
859864
claim_material.height_timer = new_timer;
860865
claim_material.feerate_previous = new_feerate;
861866
broadcaster.broadcast_transaction(&bump_tx);
867+
broadcasted_tx = true;
862868
}
863869
}
864870
for (ancestor_claim_txid, claim_material) in bump_candidates.drain() {
@@ -875,6 +881,8 @@ impl<ChanSigner: ChannelKeys> OnchainTxHandler<ChanSigner> {
875881
for req in remove_request {
876882
self.pending_claim_requests.remove(&req);
877883
}
884+
885+
broadcasted_tx
878886
}
879887

880888
pub(super) fn provide_latest_local_tx(&mut self, tx: LocalCommitmentTransaction) -> Result<(), ()> {

lightning/src/util/test_utils.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ impl<'a> channelmonitor::ManyChannelMonitor for TestChannelMonitor<'a> {
132132
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
133133
return self.simple_monitor.get_and_clear_pending_htlcs_updated();
134134
}
135+
136+
fn get_monitor_did_broadcast(&self, funding_txo: OutPoint) -> Result<bool, channelmonitor::ChannelMonitorUpdateErr> {
137+
self.simple_monitor.get_monitor_did_broadcast(funding_txo)
138+
}
135139
}
136140

137141
pub struct TestBroadcaster {

0 commit comments

Comments
 (0)