@@ -29,7 +29,7 @@ use bitcoin::hash_types::{Txid, BlockHash};
29
29
use crate :: chain;
30
30
use crate :: chain:: { ChannelMonitorUpdateStatus , Filter , WatchedOutput } ;
31
31
use crate :: chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
32
- use crate :: chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs , LATENCY_GRACE_PERIOD_BLOCKS } ;
32
+ use crate :: chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs , WithChannelMonitor , LATENCY_GRACE_PERIOD_BLOCKS } ;
33
33
use crate :: chain:: transaction:: { OutPoint , TransactionData } ;
34
34
use crate :: sign:: ecdsa:: WriteableEcdsaChannelSigner ;
35
35
use crate :: events;
@@ -359,6 +359,7 @@ where C::Target: chain::Filter,
359
359
process : FN , funding_outpoint : & OutPoint , monitor_state : & MonitorHolder < ChannelSigner >
360
360
) -> Result < ( ) , ( ) > where FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > {
361
361
let monitor = & monitor_state. monitor ;
362
+ let logger = WithChannelMonitor :: from ( & self . logger , & monitor) ;
362
363
let mut txn_outputs;
363
364
{
364
365
txn_outputs = process ( monitor, txdata) ;
@@ -375,12 +376,12 @@ where C::Target: chain::Filter,
375
376
}
376
377
}
377
378
378
- log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
379
+ log_trace ! ( logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
379
380
match self . persister . update_persisted_channel ( * funding_outpoint, None , monitor, update_id) {
380
381
ChannelMonitorUpdateStatus :: Completed =>
381
- log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
382
+ log_trace ! ( logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
382
383
ChannelMonitorUpdateStatus :: InProgress => {
383
- log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
384
+ log_debug ! ( logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
384
385
pending_monitor_updates. push ( update_id) ;
385
386
} ,
386
387
ChannelMonitorUpdateStatus :: UnrecoverableError => {
@@ -619,8 +620,9 @@ where C::Target: chain::Filter,
619
620
pub fn rebroadcast_pending_claims ( & self ) {
620
621
let monitors = self . monitors . read ( ) . unwrap ( ) ;
621
622
for ( _, monitor_holder) in & * monitors {
623
+ let logger = WithChannelMonitor :: from ( & self . logger , & monitor_holder. monitor ) ;
622
624
monitor_holder. monitor . rebroadcast_pending_claims (
623
- & * self . broadcaster , & * self . fee_estimator , & * self . logger
625
+ & * self . broadcaster , & * self . fee_estimator , & logger
624
626
)
625
627
}
626
628
}
@@ -638,17 +640,19 @@ where
638
640
fn filtered_block_connected ( & self , header : & Header , txdata : & TransactionData , height : u32 ) {
639
641
log_debug ! ( self . logger, "New best block {} at height {} provided via block_connected" , header. block_hash( ) , height) ;
640
642
self . process_chain_data ( header, Some ( height) , & txdata, |monitor, txdata| {
643
+ let logger = WithChannelMonitor :: from ( & self . logger , & monitor) ;
641
644
monitor. block_connected (
642
- header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & * self . logger )
645
+ header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & logger)
643
646
} ) ;
644
647
}
645
648
646
649
fn block_disconnected ( & self , header : & Header , height : u32 ) {
647
650
let monitor_states = self . monitors . read ( ) . unwrap ( ) ;
648
651
log_debug ! ( self . logger, "Latest block {} at height {} removed via block_disconnected" , header. block_hash( ) , height) ;
649
652
for monitor_state in monitor_states. values ( ) {
653
+ let logger = WithChannelMonitor :: from ( & self . logger , & monitor_state. monitor ) ;
650
654
monitor_state. monitor . block_disconnected (
651
- header, height, & * self . broadcaster , & * self . fee_estimator , & * self . logger ) ;
655
+ header, height, & * self . broadcaster , & * self . fee_estimator , & logger) ;
652
656
}
653
657
}
654
658
}
@@ -665,27 +669,30 @@ where
665
669
fn transactions_confirmed ( & self , header : & Header , txdata : & TransactionData , height : u32 ) {
666
670
log_debug ! ( self . logger, "{} provided transactions confirmed at height {} in block {}" , txdata. len( ) , height, header. block_hash( ) ) ;
667
671
self . process_chain_data ( header, None , txdata, |monitor, txdata| {
672
+ let logger = WithChannelMonitor :: from ( & self . logger , & monitor) ;
668
673
monitor. transactions_confirmed (
669
- header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & * self . logger )
674
+ header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & logger)
670
675
} ) ;
671
676
}
672
677
673
678
fn transaction_unconfirmed ( & self , txid : & Txid ) {
674
679
log_debug ! ( self . logger, "Transaction {} reorganized out of chain" , txid) ;
675
680
let monitor_states = self . monitors . read ( ) . unwrap ( ) ;
676
681
for monitor_state in monitor_states. values ( ) {
677
- monitor_state. monitor . transaction_unconfirmed ( txid, & * self . broadcaster , & * self . fee_estimator , & * self . logger ) ;
682
+ let logger = WithChannelMonitor :: from ( & self . logger , & monitor_state. monitor ) ;
683
+ monitor_state. monitor . transaction_unconfirmed ( txid, & * self . broadcaster , & * self . fee_estimator , & logger) ;
678
684
}
679
685
}
680
686
681
687
fn best_block_updated ( & self , header : & Header , height : u32 ) {
682
688
log_debug ! ( self . logger, "New best block {} at height {} provided via best_block_updated" , header. block_hash( ) , height) ;
683
689
self . process_chain_data ( header, Some ( height) , & [ ] , |monitor, txdata| {
690
+ let logger = WithChannelMonitor :: from ( & self . logger , & monitor) ;
684
691
// While in practice there shouldn't be any recursive calls when given empty txdata,
685
692
// it's still possible if a chain::Filter implementation returns a transaction.
686
693
debug_assert ! ( txdata. is_empty( ) ) ;
687
694
monitor. best_block_updated (
688
- header, height, & * self . broadcaster , & * self . fee_estimator , & * self . logger )
695
+ header, height, & * self . broadcaster , & * self . fee_estimator , & logger)
689
696
} ) ;
690
697
}
691
698
@@ -711,29 +718,30 @@ where C::Target: chain::Filter,
711
718
P :: Target : Persist < ChannelSigner > ,
712
719
{
713
720
fn watch_channel ( & self , funding_outpoint : OutPoint , monitor : ChannelMonitor < ChannelSigner > ) -> Result < ChannelMonitorUpdateStatus , ( ) > {
721
+ let logger = WithChannelMonitor :: from ( & self . logger , & monitor) ;
714
722
let mut monitors = self . monitors . write ( ) . unwrap ( ) ;
715
723
let entry = match monitors. entry ( funding_outpoint) {
716
724
hash_map:: Entry :: Occupied ( _) => {
717
- log_error ! ( self . logger, "Failed to add new channel data: channel monitor for given outpoint is already present" ) ;
725
+ log_error ! ( logger, "Failed to add new channel data: channel monitor for given outpoint is already present" ) ;
718
726
return Err ( ( ) ) ;
719
727
} ,
720
728
hash_map:: Entry :: Vacant ( e) => e,
721
729
} ;
722
- log_trace ! ( self . logger, "Got new ChannelMonitor for channel {}" , log_funding_info!( monitor) ) ;
730
+ log_trace ! ( logger, "Got new ChannelMonitor for channel {}" , log_funding_info!( monitor) ) ;
723
731
let update_id = MonitorUpdateId :: from_new_monitor ( & monitor) ;
724
732
let mut pending_monitor_updates = Vec :: new ( ) ;
725
733
let persist_res = self . persister . persist_new_channel ( funding_outpoint, & monitor, update_id) ;
726
734
match persist_res {
727
735
ChannelMonitorUpdateStatus :: InProgress => {
728
- log_info ! ( self . logger, "Persistence of new ChannelMonitor for channel {} in progress" , log_funding_info!( monitor) ) ;
736
+ log_info ! ( logger, "Persistence of new ChannelMonitor for channel {} in progress" , log_funding_info!( monitor) ) ;
729
737
pending_monitor_updates. push ( update_id) ;
730
738
} ,
731
739
ChannelMonitorUpdateStatus :: Completed => {
732
- log_info ! ( self . logger, "Persistence of new ChannelMonitor for channel {} completed" , log_funding_info!( monitor) ) ;
740
+ log_info ! ( logger, "Persistence of new ChannelMonitor for channel {} completed" , log_funding_info!( monitor) ) ;
733
741
} ,
734
742
ChannelMonitorUpdateStatus :: UnrecoverableError => {
735
743
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
736
- log_error ! ( self . logger, "{}" , err_str) ;
744
+ log_error ! ( logger, "{}" , err_str) ;
737
745
panic ! ( "{}" , err_str) ;
738
746
} ,
739
747
}
@@ -750,8 +758,9 @@ where C::Target: chain::Filter,
750
758
751
759
fn update_channel ( & self , funding_txo : OutPoint , update : & ChannelMonitorUpdate ) -> ChannelMonitorUpdateStatus {
752
760
// Update the monitor that watches the channel referred to by the given outpoint.
753
- let monitors = self . monitors . read ( ) . unwrap ( ) ;
754
- let ret = match monitors. get ( & funding_txo) {
761
+ let monitors_lock = self . monitors . read ( ) . unwrap ( ) ;
762
+ let monitors = monitors_lock. deref ( ) ;
763
+ match monitors. get ( & funding_txo) {
755
764
None => {
756
765
log_error ! ( self . logger, "Failed to update channel monitor: no such monitor registered" ) ;
757
766
@@ -765,7 +774,8 @@ where C::Target: chain::Filter,
765
774
} ,
766
775
Some ( monitor_state) => {
767
776
let monitor = & monitor_state. monitor ;
768
- log_trace ! ( self . logger, "Updating ChannelMonitor for channel {}" , log_funding_info!( monitor) ) ;
777
+ let logger = WithChannelMonitor :: from ( & self . logger , & monitor) ;
778
+ log_trace ! ( logger, "Updating ChannelMonitor for channel {}" , log_funding_info!( monitor) ) ;
769
779
let update_res = monitor. update_monitor ( update, & self . broadcaster , & self . fee_estimator , & self . logger ) ;
770
780
771
781
let update_id = MonitorUpdateId :: from_monitor_update ( update) ;
@@ -776,49 +786,48 @@ where C::Target: chain::Filter,
776
786
// We don't want to persist a `monitor_update` which results in a failure to apply later
777
787
// while reading `channel_monitor` with updates from storage. Instead, we should persist
778
788
// the entire `channel_monitor` here.
779
- log_warn ! ( self . logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor" , log_funding_info!( monitor) ) ;
789
+ log_warn ! ( logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor" , log_funding_info!( monitor) ) ;
780
790
self . persister . update_persisted_channel ( funding_txo, None , monitor, update_id)
781
791
} else {
782
792
self . persister . update_persisted_channel ( funding_txo, Some ( update) , monitor, update_id)
783
793
} ;
784
794
match persist_res {
785
795
ChannelMonitorUpdateStatus :: InProgress => {
786
796
pending_monitor_updates. push ( update_id) ;
787
- log_debug ! ( self . logger, "Persistence of ChannelMonitorUpdate for channel {} in progress" , log_funding_info!( monitor) ) ;
797
+ log_debug ! ( logger, "Persistence of ChannelMonitorUpdate for channel {} in progress" , log_funding_info!( monitor) ) ;
788
798
} ,
789
799
ChannelMonitorUpdateStatus :: Completed => {
790
- log_debug ! ( self . logger, "Persistence of ChannelMonitorUpdate for channel {} completed" , log_funding_info!( monitor) ) ;
800
+ log_debug ! ( logger, "Persistence of ChannelMonitorUpdate for channel {} completed" , log_funding_info!( monitor) ) ;
801
+ } ,
802
+ ChannelMonitorUpdateStatus :: UnrecoverableError => {
803
+ // Take the monitors lock for writing so that we poison it and any future
804
+ // operations going forward fail immediately.
805
+ core:: mem:: drop ( monitors) ;
806
+ let _poison = self . monitors . write ( ) . unwrap ( ) ;
807
+ let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
808
+ log_error ! ( logger, "{}" , err_str) ;
809
+ panic ! ( "{}" , err_str) ;
791
810
} ,
792
- ChannelMonitorUpdateStatus :: UnrecoverableError => { /* we'll panic in a moment */ } ,
793
811
}
794
812
if update_res. is_err ( ) {
795
813
ChannelMonitorUpdateStatus :: InProgress
796
814
} else {
797
815
persist_res
798
816
}
799
817
}
800
- } ;
801
- if let ChannelMonitorUpdateStatus :: UnrecoverableError = ret {
802
- // Take the monitors lock for writing so that we poison it and any future
803
- // operations going forward fail immediately.
804
- core:: mem:: drop ( monitors) ;
805
- let _poison = self . monitors . write ( ) . unwrap ( ) ;
806
- let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
807
- log_error ! ( self . logger, "{}" , err_str) ;
808
- panic ! ( "{}" , err_str) ;
809
818
}
810
- ret
811
819
}
812
820
813
821
fn release_pending_monitor_events ( & self ) -> Vec < ( OutPoint , Vec < MonitorEvent > , Option < PublicKey > ) > {
814
822
let mut pending_monitor_events = self . pending_monitor_events . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
815
823
for monitor_state in self . monitors . read ( ) . unwrap ( ) . values ( ) {
824
+ let logger = WithChannelMonitor :: from ( & self . logger , & monitor_state. monitor ) ;
816
825
let is_pending_monitor_update = monitor_state. has_pending_chainsync_updates ( & monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ) ;
817
826
if !is_pending_monitor_update || monitor_state. last_chain_persist_height . load ( Ordering :: Acquire ) + LATENCY_GRACE_PERIOD_BLOCKS as usize <= self . highest_chain_height . load ( Ordering :: Acquire ) {
818
827
if is_pending_monitor_update {
819
- log_error ! ( self . logger, "A ChannelMonitor sync took longer than {} blocks to complete." , LATENCY_GRACE_PERIOD_BLOCKS ) ;
820
- log_error ! ( self . logger, " To avoid funds-loss, we are allowing monitor updates to be released." ) ;
821
- log_error ! ( self . logger, " This may cause duplicate payment events to be generated." ) ;
828
+ log_error ! ( logger, "A ChannelMonitor sync took longer than {} blocks to complete." , LATENCY_GRACE_PERIOD_BLOCKS ) ;
829
+ log_error ! ( logger, " To avoid funds-loss, we are allowing monitor updates to be released." ) ;
830
+ log_error ! ( logger, " This may cause duplicate payment events to be generated." ) ;
822
831
}
823
832
let monitor_events = monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ;
824
833
if monitor_events. len ( ) > 0 {
0 commit comments