@@ -39,7 +39,7 @@ use util::events::EventHandler;
39
39
use ln:: channelmanager:: ChannelDetails ;
40
40
41
41
use prelude:: * ;
42
- use sync:: RwLock ;
42
+ use sync:: { RwLock , Mutex } ;
43
43
use core:: ops:: Deref ;
44
44
45
45
/// An implementation of [`chain::Watch`] for monitoring channels.
@@ -60,6 +60,18 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
60
60
{
61
61
/// The monitors
62
62
pub monitors : RwLock < HashMap < OutPoint , ChannelMonitor < ChannelSigner > > > ,
63
+ /// Beyond the synchronization of `monitors` itself, we cannot handle user events until after
64
+ /// any chain updates have been stored on disk. This mutex is used to provide mutual exclusion
65
+ /// of event-processing/block-/transaction-connection.
66
+ /// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor
67
+ /// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping
68
+ /// the pending payment entry, and then reloading before the monitor is persisted, resulting in
69
+ /// the ChannelManager re-adding the same payment entry, before the same block is replayed,
70
+ /// resulting in a duplicate PaymentSent event.
71
+ ///
72
+ /// Note that this is set to true if any persistence fails, at which point *no events must be
73
+ /// processed* (and the user has indicated they will shut down very very soon).
74
+ event_mutex : Mutex < bool > ,
63
75
chain_source : Option < C > ,
64
76
broadcaster : T ,
65
77
logger : L ,
@@ -88,26 +100,43 @@ where C::Target: chain::Filter,
88
100
FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
89
101
{
90
102
let mut dependent_txdata = Vec :: new ( ) ;
91
- let monitors = self . monitors . read ( ) . unwrap ( ) ;
92
- for monitor in monitors. values ( ) {
93
- let mut txn_outputs = process ( monitor, txdata) ;
103
+ {
104
+ let monitors = self . monitors . write ( ) . unwrap ( ) ;
105
+ for ( funding_outpoint, monitor) in monitors. iter ( ) {
106
+ let mut txn_outputs;
107
+ {
108
+ let mut ev_lock = self . event_mutex . lock ( ) . unwrap ( ) ;
109
+ txn_outputs = process ( monitor, txdata) ;
110
+ log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
111
+ if let Err ( ( ) ) = self . persister . sync_persisted_channel ( * funding_outpoint, monitor) {
112
+ // If we fail to persist a monitor, stop processing events, assuming we'll
113
+ // be shutting down soon (and the events can be re-generated on chain
114
+ // replay).
115
+ * ev_lock = true ;
116
+ log_error ! ( self . logger, "Failed to sync Channel Monitor for channel {}!" , log_funding_info!( monitor) ) ;
117
+ log_error ! ( self . logger, " The LDK-based application should now be shutting down!" ) ;
118
+ } else {
119
+ log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
120
+ }
121
+ }
94
122
95
- // Register any new outputs with the chain source for filtering, storing any dependent
96
- // transactions from within the block that previously had not been included in txdata.
97
- if let Some ( ref chain_source) = self . chain_source {
98
- let block_hash = header. block_hash ( ) ;
99
- for ( txid, mut outputs) in txn_outputs. drain ( ..) {
100
- for ( idx, output) in outputs. drain ( ..) {
101
- // Register any new outputs with the chain source for filtering and recurse
102
- // if it indicates that there are dependent transactions within the block
103
- // that had not been previously included in txdata.
104
- let output = WatchedOutput {
105
- block_hash : Some ( block_hash) ,
106
- outpoint : OutPoint { txid, index : idx as u16 } ,
107
- script_pubkey : output. script_pubkey ,
108
- } ;
109
- if let Some ( tx) = chain_source. register_output ( output) {
110
- dependent_txdata. push ( tx) ;
123
+ // Register any new outputs with the chain source for filtering, storing any dependent
124
+ // transactions from within the block that previously had not been included in txdata.
125
+ if let Some ( ref chain_source) = self . chain_source {
126
+ let block_hash = header. block_hash ( ) ;
127
+ for ( txid, mut outputs) in txn_outputs. drain ( ..) {
128
+ for ( idx, output) in outputs. drain ( ..) {
129
+ // Register any new outputs with the chain source for filtering and recurse
130
+ // if it indicates that there are dependent transactions within the block
131
+ // that had not been previously included in txdata.
132
+ let output = WatchedOutput {
133
+ block_hash : Some ( block_hash) ,
134
+ outpoint : OutPoint { txid, index : idx as u16 } ,
135
+ script_pubkey : output. script_pubkey ,
136
+ } ;
137
+ if let Some ( tx) = chain_source. register_output ( output) {
138
+ dependent_txdata. push ( tx) ;
139
+ }
111
140
}
112
141
}
113
142
}
@@ -133,6 +162,7 @@ where C::Target: chain::Filter,
133
162
pub fn new ( chain_source : Option < C > , broadcaster : T , logger : L , feeest : F , persister : P ) -> Self {
134
163
Self {
135
164
monitors : RwLock :: new ( HashMap :: new ( ) ) ,
165
+ event_mutex : Mutex :: new ( false ) ,
136
166
chain_source,
137
167
broadcaster,
138
168
logger,
@@ -331,6 +361,13 @@ where C::Target: chain::Filter,
331
361
}
332
362
333
363
fn release_pending_monitor_events ( & self ) -> Vec < MonitorEvent > {
364
+ let ev_lock = self . event_mutex . lock ( ) . unwrap ( ) ;
365
+ if * ev_lock {
366
+ log_error ! ( self . logger, "Failed to sync a Channel Monitor, refusing to provide monitor events!" ) ;
367
+ log_error ! ( self . logger, " The LDK-based application should now be shutting down!" ) ;
368
+ return Vec :: new ( ) ;
369
+ }
370
+
334
371
let mut pending_monitor_events = Vec :: new ( ) ;
335
372
for monitor in self . monitors . read ( ) . unwrap ( ) . values ( ) {
336
373
pending_monitor_events. append ( & mut monitor. get_and_clear_pending_monitor_events ( ) ) ;
@@ -353,6 +390,13 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
353
390
///
354
391
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
355
392
fn process_pending_events < H : Deref > ( & self , handler : H ) where H :: Target : EventHandler {
393
+ let ev_lock = self . event_mutex . lock ( ) . unwrap ( ) ;
394
+ if * ev_lock {
395
+ log_error ! ( self . logger, "Failed to sync a Channel Monitor, refusing to provide monitor events!" ) ;
396
+ log_error ! ( self . logger, " The LDK-based application should now be shutting down!" ) ;
397
+ return ;
398
+ }
399
+
356
400
let mut pending_events = Vec :: new ( ) ;
357
401
for monitor in self . monitors . read ( ) . unwrap ( ) . values ( ) {
358
402
pending_events. append ( & mut monitor. get_and_clear_pending_events ( ) ) ;
0 commit comments