@@ -14,6 +14,7 @@ use lightning::chain::keysinterface::{Sign, KeysInterface};
14
14
use lightning:: ln:: channelmanager:: ChannelManager ;
15
15
use lightning:: ln:: msgs:: { ChannelMessageHandler , RoutingMessageHandler } ;
16
16
use lightning:: ln:: peer_handler:: { PeerManager , SocketDescriptor } ;
17
+ use lightning:: util:: events:: { EventHandler , EventsProvider } ;
17
18
use lightning:: util:: logger:: Logger ;
18
19
use std:: sync:: Arc ;
19
20
use std:: sync:: atomic:: { AtomicBool , Ordering } ;
@@ -109,11 +110,12 @@ impl BackgroundProcessor {
109
110
Descriptor : ' static + SocketDescriptor + Send + Sync ,
110
111
CMH : ' static + Deref + Send + Sync ,
111
112
RMH : ' static + Deref + Send + Sync ,
113
+ EH : ' static + EventHandler + Send + Sync ,
112
114
CMP : ' static + Send + ChannelManagerPersister < Signer , M , T , K , F , L > ,
113
115
CM : ' static + Deref < Target = ChannelManager < Signer , M , T , K , F , L > > + Send + Sync ,
114
116
PM : ' static + Deref < Target = PeerManager < Descriptor , CMH , RMH , L > > + Send + Sync ,
115
117
>
116
- ( handler : CMP , channel_manager : CM , peer_manager : PM , logger : L ) -> Self
118
+ ( persister : CMP , event_handler : EH , channel_manager : CM , peer_manager : PM , logger : L ) -> Self
117
119
where
118
120
M :: Target : ' static + chain:: Watch < Signer > ,
119
121
T :: Target : ' static + BroadcasterInterface ,
@@ -129,10 +131,11 @@ impl BackgroundProcessor {
129
131
let mut current_time = Instant :: now ( ) ;
130
132
loop {
131
133
peer_manager. process_events ( ) ;
134
+ channel_manager. process_pending_events ( & event_handler) ;
132
135
let updates_available =
133
136
channel_manager. await_persistable_update_timeout ( Duration :: from_millis ( 100 ) ) ;
134
137
if updates_available {
135
- handler . persist_manager ( & * channel_manager) ?;
138
+ persister . persist_manager ( & * channel_manager) ?;
136
139
}
137
140
// Exit the loop if the background processor was requested to stop.
138
141
if stop_thread. load ( Ordering :: Acquire ) == true {
@@ -281,8 +284,9 @@ mod tests {
281
284
282
285
// Initiate the background processors to watch each node.
283
286
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
284
- let callback = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
285
- let bg_processor = BackgroundProcessor :: start ( callback, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
287
+ let persister = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
288
+ let event_handler = |_| { } ;
289
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
286
290
287
291
// Go through the channel creation process until each node should have something persisted.
288
292
let tx = open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
@@ -336,8 +340,9 @@ mod tests {
336
340
// `FRESHNESS_TIMER`.
337
341
let nodes = create_nodes ( 1 , "test_timer_tick_called" . to_string ( ) ) ;
338
342
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
339
- let callback = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
340
- let bg_processor = BackgroundProcessor :: start ( callback, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
343
+ let persister = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
344
+ let event_handler = |_| { } ;
345
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
341
346
loop {
342
347
let log_entries = nodes[ 0 ] . logger . lines . lock ( ) . unwrap ( ) ;
343
348
let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred" . to_string ( ) ;
@@ -364,7 +369,8 @@ mod tests {
364
369
}
365
370
366
371
let nodes = create_nodes ( 2 , "test_persist_error" . to_string ( ) ) ;
367
- let bg_processor = BackgroundProcessor :: start ( persist_manager, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
372
+ let event_handler = |_| { } ;
373
+ let bg_processor = BackgroundProcessor :: start ( persist_manager, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
368
374
open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
369
375
370
376
let _ = bg_processor. thread_handle . join ( ) . unwrap ( ) . expect_err ( "Errored persisting manager: test" ) ;
0 commit comments