@@ -263,82 +263,67 @@ where C::Target: chain::Filter,
263
263
where
264
264
FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
265
265
{
266
- let mut dependent_txdata = Vec :: new ( ) ;
267
- {
268
- let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
269
- if let Some ( height) = best_height {
270
- // If the best block height is being updated, update highest_chain_height under the
271
- // monitors write lock.
272
- let old_height = self . highest_chain_height . load ( Ordering :: Acquire ) ;
273
- let new_height = height as usize ;
274
- if new_height > old_height {
275
- self . highest_chain_height . store ( new_height, Ordering :: Release ) ;
276
- }
266
+ let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
267
+ if let Some ( height) = best_height {
268
+ // If the best block height is being updated, update highest_chain_height under the
269
+ // monitors write lock.
270
+ let old_height = self . highest_chain_height . load ( Ordering :: Acquire ) ;
271
+ let new_height = height as usize ;
272
+ if new_height > old_height {
273
+ self . highest_chain_height . store ( new_height, Ordering :: Release ) ;
277
274
}
275
+ }
278
276
279
- for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
280
- let monitor = & monitor_state. monitor ;
281
- let mut txn_outputs;
282
- {
283
- txn_outputs = process ( monitor, txdata) ;
284
- let update_id = MonitorUpdateId {
285
- contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
286
- } ;
287
- let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
288
- if let Some ( height) = best_height {
289
- if !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates) {
290
- // If there are not ChainSync persists awaiting completion, go ahead and
291
- // set last_chain_persist_height here - we wouldn't want the first
292
- // TemporaryFailure to always immediately be considered "overly delayed".
293
- monitor_state. last_chain_persist_height . store ( height as usize , Ordering :: Release ) ;
294
- }
277
+ for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
278
+ let monitor = & monitor_state. monitor ;
279
+ let mut txn_outputs;
280
+ {
281
+ txn_outputs = process ( monitor, txdata) ;
282
+ let update_id = MonitorUpdateId {
283
+ contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
284
+ } ;
285
+ let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
286
+ if let Some ( height) = best_height {
287
+ if !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates) {
288
+ // If there are not ChainSync persists awaiting completion, go ahead and
289
+ // set last_chain_persist_height here - we wouldn't want the first
290
+ // TemporaryFailure to always immediately be considered "overly delayed".
291
+ monitor_state. last_chain_persist_height . store ( height as usize , Ordering :: Release ) ;
295
292
}
293
+ }
296
294
297
- log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
298
- match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor, update_id) {
299
- Ok ( ( ) ) =>
300
- log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
301
- Err ( ChannelMonitorUpdateErr :: PermanentFailure ) => {
302
- monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
303
- self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( * funding_outpoint, vec ! [ MonitorEvent :: UpdateFailed ( * funding_outpoint) ] , monitor. get_counterparty_node_id ( ) ) ) ;
304
- } ,
305
- Err ( ChannelMonitorUpdateErr :: TemporaryFailure ) => {
306
- log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
307
- pending_monitor_updates. push ( update_id) ;
308
- } ,
309
- }
295
+ log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
296
+ match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor, update_id) {
297
+ Ok ( ( ) ) =>
298
+ log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
299
+ Err ( ChannelMonitorUpdateErr :: PermanentFailure ) => {
300
+ monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
301
+ self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( * funding_outpoint, vec ! [ MonitorEvent :: UpdateFailed ( * funding_outpoint) ] , monitor. get_counterparty_node_id ( ) ) ) ;
302
+ } ,
303
+ Err ( ChannelMonitorUpdateErr :: TemporaryFailure ) => {
304
+ log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
305
+ pending_monitor_updates. push ( update_id) ;
306
+ } ,
310
307
}
308
+ }
311
309
312
- // Register any new outputs with the chain source for filtering, storing any dependent
313
- // transactions from within the block that previously had not been included in txdata.
314
- if let Some ( ref chain_source) = self . chain_source {
315
- let block_hash = header. block_hash ( ) ;
316
- for ( txid, mut outputs) in txn_outputs. drain ( ..) {
317
- for ( idx, output) in outputs. drain ( ..) {
318
- // Register any new outputs with the chain source for filtering and recurse
319
- // if it indicates that there are dependent transactions within the block
320
- // that had not been previously included in txdata.
321
- let output = WatchedOutput {
322
- block_hash : Some ( block_hash) ,
323
- outpoint : OutPoint { txid, index : idx as u16 } ,
324
- script_pubkey : output. script_pubkey ,
325
- } ;
326
- if let Some ( tx) = chain_source. register_output ( output) {
327
- dependent_txdata. push ( tx) ;
328
- }
329
- }
310
+ // Register any new outputs with the chain source for filtering, storing any dependent
311
+ // transactions from within the block that previously had not been included in txdata.
312
+ if let Some ( ref chain_source) = self . chain_source {
313
+ let block_hash = header. block_hash ( ) ;
314
+ for ( txid, mut outputs) in txn_outputs. drain ( ..) {
315
+ for ( idx, output) in outputs. drain ( ..) {
316
+ // Register any new outputs with the chain source for filtering
317
+ let output = WatchedOutput {
318
+ block_hash : Some ( block_hash) ,
319
+ outpoint : OutPoint { txid, index : idx as u16 } ,
320
+ script_pubkey : output. script_pubkey ,
321
+ } ;
322
+ chain_source. register_output ( output)
330
323
}
331
324
}
332
325
}
333
326
}
334
-
335
- // Recursively call for any dependent transactions that were identified by the chain source.
336
- if !dependent_txdata. is_empty ( ) {
337
- dependent_txdata. sort_unstable_by_key ( |( index, _tx) | * index) ;
338
- dependent_txdata. dedup_by_key ( |( index, _tx) | * index) ;
339
- let txdata: Vec < _ > = dependent_txdata. iter ( ) . map ( |( index, tx) | ( * index, tx) ) . collect ( ) ;
340
- self . process_chain_data ( header, None , & txdata, process) ; // We skip the best height the second go-around
341
- }
342
327
}
343
328
344
329
/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
@@ -746,50 +731,6 @@ mod tests {
746
731
use ln:: msgs:: ChannelMessageHandler ;
747
732
use util:: errors:: APIError ;
748
733
use util:: events:: { ClosureReason , MessageSendEvent , MessageSendEventsProvider } ;
749
- use util:: test_utils:: { OnRegisterOutput , TxOutReference } ;
750
-
751
- /// Tests that in-block dependent transactions are processed by `block_connected` when not
752
- /// included in `txdata` but returned by [`chain::Filter::register_output`]. For instance,
753
- /// a (non-anchor) commitment transaction's HTLC output may be spent in the same block as the
754
- /// commitment transaction itself. An Electrum client may filter the commitment transaction but
755
- /// needs to return the HTLC transaction so it can be processed.
756
- #[ test]
757
- fn connect_block_checks_dependent_transactions ( ) {
758
- let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
759
- let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
760
- let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
761
- let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
762
- let channel = create_announced_chan_between_nodes (
763
- & nodes, 0 , 1 , InitFeatures :: known ( ) , InitFeatures :: known ( ) ) ;
764
-
765
- // Send a payment, saving nodes[0]'s revoked commitment and HTLC-Timeout transactions.
766
- let ( commitment_tx, htlc_tx) = {
767
- let payment_preimage = route_payment ( & nodes[ 0 ] , & vec ! ( & nodes[ 1 ] ) [ ..] , 5_000_000 ) . 0 ;
768
- let mut txn = get_local_commitment_txn ! ( nodes[ 0 ] , channel. 2 ) ;
769
- claim_payment ( & nodes[ 0 ] , & vec ! ( & nodes[ 1 ] ) [ ..] , payment_preimage) ;
770
-
771
- assert_eq ! ( txn. len( ) , 2 ) ;
772
- ( txn. remove ( 0 ) , txn. remove ( 0 ) )
773
- } ;
774
-
775
- // Set expectations on nodes[1]'s chain source to return dependent transactions.
776
- let htlc_output = TxOutReference ( commitment_tx. clone ( ) , 0 ) ;
777
- let to_local_output = TxOutReference ( commitment_tx. clone ( ) , 1 ) ;
778
- let htlc_timeout_output = TxOutReference ( htlc_tx. clone ( ) , 0 ) ;
779
- nodes[ 1 ] . chain_source
780
- . expect ( OnRegisterOutput { with : htlc_output, returns : Some ( ( 1 , htlc_tx) ) } )
781
- . expect ( OnRegisterOutput { with : to_local_output, returns : None } )
782
- . expect ( OnRegisterOutput { with : htlc_timeout_output, returns : None } ) ;
783
-
784
- // Notify nodes[1] that nodes[0]'s revoked commitment transaction was mined. The chain
785
- // source should return the dependent HTLC transaction when the HTLC output is registered.
786
- mine_transaction ( & nodes[ 1 ] , & commitment_tx) ;
787
-
788
- // Clean up so uninteresting assertions don't fail.
789
- check_added_monitors ! ( nodes[ 1 ] , 1 ) ;
790
- nodes[ 1 ] . node . get_and_clear_pending_msg_events ( ) ;
791
- nodes[ 1 ] . node . get_and_clear_pending_events ( ) ;
792
- }
793
734
794
735
#[ test]
795
736
fn test_async_ooo_offchain_updates ( ) {
0 commit comments