@@ -14,6 +14,7 @@ use lightning::chain::keysinterface::{Sign, KeysInterface};
14
14
use lightning:: ln:: channelmanager:: ChannelManager ;
15
15
use lightning:: ln:: msgs:: { ChannelMessageHandler , RoutingMessageHandler } ;
16
16
use lightning:: ln:: peer_handler:: { PeerManager , SocketDescriptor } ;
17
+ use lightning:: util:: events:: { EventHandler , EventsProvider } ;
17
18
use lightning:: util:: logger:: Logger ;
18
19
use std:: sync:: Arc ;
19
20
use std:: sync:: atomic:: { AtomicBool , Ordering } ;
@@ -109,11 +110,12 @@ impl BackgroundProcessor {
109
110
Descriptor : ' static + SocketDescriptor + Send + Sync ,
110
111
CMH : ' static + Deref + Send + Sync ,
111
112
RMH : ' static + Deref + Send + Sync ,
113
+ EH : ' static + EventHandler + Send + Sync ,
112
114
CMP : ' static + Send + ChannelManagerPersister < Signer , M , T , K , F , L > ,
113
115
CM : ' static + Deref < Target = ChannelManager < Signer , M , T , K , F , L > > + Send + Sync ,
114
116
PM : ' static + Deref < Target = PeerManager < Descriptor , CMH , RMH , L > > + Send + Sync ,
115
117
>
116
- ( handler : CMP , channel_manager : CM , peer_manager : PM , logger : L ) -> Self
118
+ ( persister : CMP , event_handler : EH , channel_manager : CM , peer_manager : PM , logger : L ) -> Self
117
119
where
118
120
M :: Target : ' static + chain:: Watch < Signer > ,
119
121
T :: Target : ' static + BroadcasterInterface ,
@@ -129,10 +131,11 @@ impl BackgroundProcessor {
129
131
let mut current_time = Instant :: now ( ) ;
130
132
loop {
131
133
peer_manager. process_events ( ) ;
134
+ channel_manager. process_pending_events ( & event_handler) ;
132
135
let updates_available =
133
136
channel_manager. await_persistable_update_timeout ( Duration :: from_millis ( 100 ) ) ;
134
137
if updates_available {
135
- handler . persist_manager ( & * channel_manager) ?;
138
+ persister . persist_manager ( & * channel_manager) ?;
136
139
}
137
140
// Exit the loop if the background processor was requested to stop.
138
141
if stop_thread. load ( Ordering :: Acquire ) == true {
@@ -162,10 +165,8 @@ mod tests {
162
165
use bitcoin:: blockdata:: constants:: genesis_block;
163
166
use bitcoin:: blockdata:: transaction:: { Transaction , TxOut } ;
164
167
use bitcoin:: network:: constants:: Network ;
165
- use lightning:: chain;
166
- use lightning:: chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
167
168
use lightning:: chain:: chainmonitor;
168
- use lightning:: chain:: keysinterface:: { Sign , InMemorySigner , KeysInterface , KeysManager } ;
169
+ use lightning:: chain:: keysinterface:: { InMemorySigner , KeysInterface , KeysManager } ;
169
170
use lightning:: chain:: transaction:: OutPoint ;
170
171
use lightning:: get_event_msg;
171
172
use lightning:: ln:: channelmanager:: { BestBlock , ChainParameters , ChannelManager , SimpleArcChannelManager } ;
@@ -174,15 +175,14 @@ mod tests {
174
175
use lightning:: ln:: peer_handler:: { PeerManager , MessageHandler , SocketDescriptor } ;
175
176
use lightning:: util:: config:: UserConfig ;
176
177
use lightning:: util:: events:: { Event , MessageSendEventsProvider , MessageSendEvent } ;
177
- use lightning:: util:: logger:: Logger ;
178
178
use lightning:: util:: ser:: Writeable ;
179
179
use lightning:: util:: test_utils;
180
180
use lightning_persister:: FilesystemPersister ;
181
181
use std:: fs;
182
182
use std:: path:: PathBuf ;
183
183
use std:: sync:: { Arc , Mutex } ;
184
184
use std:: time:: Duration ;
185
- use super :: BackgroundProcessor ;
185
+ use super :: { BackgroundProcessor , FRESHNESS_TIMER } ;
186
186
187
187
#[ derive( Clone , Eq , Hash , PartialEq ) ]
188
188
struct TestDescriptor { }
@@ -246,13 +246,27 @@ mod tests {
246
246
}
247
247
248
248
macro_rules! open_channel {
249
+ ( $node_a: expr, $node_b: expr, $channel_value: expr) => { {
250
+ begin_open_channel!( $node_a, $node_b, $channel_value) ;
251
+ let events = $node_a. node. get_and_clear_pending_events( ) ;
252
+ assert_eq!( events. len( ) , 1 ) ;
253
+ let ( temporary_channel_id, tx) = handle_funding_generation_ready!( events[ 0 ] , $channel_value) ;
254
+ end_open_channel!( $node_a, $node_b, temporary_channel_id, tx) ;
255
+ tx
256
+ } }
257
+ }
258
+
259
+ macro_rules! begin_open_channel {
249
260
( $node_a: expr, $node_b: expr, $channel_value: expr) => { {
250
261
$node_a. node. create_channel( $node_b. node. get_our_node_id( ) , $channel_value, 100 , 42 , None ) . unwrap( ) ;
251
262
$node_b. node. handle_open_channel( & $node_a. node. get_our_node_id( ) , InitFeatures :: known( ) , & get_event_msg!( $node_a, MessageSendEvent :: SendOpenChannel , $node_b. node. get_our_node_id( ) ) ) ;
252
263
$node_a. node. handle_accept_channel( & $node_b. node. get_our_node_id( ) , InitFeatures :: known( ) , & get_event_msg!( $node_b, MessageSendEvent :: SendAcceptChannel , $node_a. node. get_our_node_id( ) ) ) ;
253
- let events = $node_a. node. get_and_clear_pending_events( ) ;
254
- assert_eq!( events. len( ) , 1 ) ;
255
- let ( temporary_channel_id, tx) = match events[ 0 ] {
264
+ } }
265
+ }
266
+
267
+ macro_rules! handle_funding_generation_ready {
268
+ ( $event: expr, $channel_value: expr) => { {
269
+ match $event {
256
270
Event :: FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => {
257
271
assert_eq!( * channel_value_satoshis, $channel_value) ;
258
272
assert_eq!( user_channel_id, 42 ) ;
@@ -263,12 +277,15 @@ mod tests {
263
277
( * temporary_channel_id, tx)
264
278
} ,
265
279
_ => panic!( "Unexpected event" ) ,
266
- } ;
280
+ }
281
+ } }
282
+ }
267
283
268
- $node_a. node. funding_transaction_generated( & temporary_channel_id, tx. clone( ) ) . unwrap( ) ;
284
+ macro_rules! end_open_channel {
285
+ ( $node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => { {
286
+ $node_a. node. funding_transaction_generated( & $temporary_channel_id, $tx. clone( ) ) . unwrap( ) ;
269
287
$node_b. node. handle_funding_created( & $node_a. node. get_our_node_id( ) , & get_event_msg!( $node_a, MessageSendEvent :: SendFundingCreated , $node_b. node. get_our_node_id( ) ) ) ;
270
288
$node_a. node. handle_funding_signed( & $node_b. node. get_our_node_id( ) , & get_event_msg!( $node_b, MessageSendEvent :: SendFundingSigned , $node_a. node. get_our_node_id( ) ) ) ;
271
- tx
272
289
} }
273
290
}
274
291
@@ -279,13 +296,16 @@ mod tests {
279
296
// re-persistence and is successfully re-persisted.
280
297
let nodes = create_nodes ( 2 , "test_background_processor" . to_string ( ) ) ;
281
298
299
+ // Go through the channel creation process so that each node has something to persist. Since
300
+ // open_channel consumes events, it must complete before starting BackgroundProcessor to
301
+ // avoid a race with processing events.
302
+ let tx = open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
303
+
282
304
// Initiate the background processors to watch each node.
283
305
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
284
- let callback = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
285
- let bg_processor = BackgroundProcessor :: start ( callback, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
286
-
287
- // Go through the channel creation process until each node should have something persisted.
288
- let tx = open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
306
+ let persister = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
307
+ let event_handler = |_| { } ;
308
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
289
309
290
310
macro_rules! check_persisted_data {
291
311
( $node: expr, $filepath: expr, $expected_bytes: expr) => {
@@ -336,8 +356,9 @@ mod tests {
336
356
// `FRESHNESS_TIMER`.
337
357
let nodes = create_nodes ( 1 , "test_timer_tick_called" . to_string ( ) ) ;
338
358
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
339
- let callback = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
340
- let bg_processor = BackgroundProcessor :: start ( callback, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
359
+ let persister = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
360
+ let event_handler = |_| { } ;
361
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
341
362
loop {
342
363
let log_entries = nodes[ 0 ] . logger . lines . lock ( ) . unwrap ( ) ;
343
364
let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred" . to_string ( ) ;
@@ -352,21 +373,37 @@ mod tests {
352
373
#[ test]
353
374
fn test_persist_error ( ) {
354
375
// Test that if we encounter an error during manager persistence, the thread panics.
355
- fn persist_manager < Signer , M , T , K , F , L > ( _data : & ChannelManager < Signer , Arc < M > , Arc < T > , Arc < K > , Arc < F > , Arc < L > > ) -> Result < ( ) , std:: io:: Error >
356
- where Signer : ' static + Sign ,
357
- M : ' static + chain:: Watch < Signer > ,
358
- T : ' static + BroadcasterInterface ,
359
- K : ' static + KeysInterface < Signer =Signer > ,
360
- F : ' static + FeeEstimator ,
361
- L : ' static + Logger ,
362
- {
363
- Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , "test" ) )
364
- }
365
-
366
376
let nodes = create_nodes ( 2 , "test_persist_error" . to_string ( ) ) ;
367
- let bg_processor = BackgroundProcessor :: start ( persist_manager, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
368
377
open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
369
378
379
+ let persister = |_: & _ | Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , "test" ) ) ;
380
+ let event_handler = |_| { } ;
381
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
370
382
let _ = bg_processor. thread_handle . join ( ) . unwrap ( ) . expect_err ( "Errored persisting manager: test" ) ;
371
383
}
384
+
385
+ #[ test]
386
+ fn test_background_event_handling ( ) {
387
+ let nodes = create_nodes ( 2 , "test_background_event_handling" . to_string ( ) ) ;
388
+ let channel_value = 100000 ;
389
+ let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
390
+ let persister = move |node : & _ | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
391
+
392
+ // Set up a background event handler for FundingGenerationReady events.
393
+ let ( sender, receiver) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
394
+ let event_handler = move |event| {
395
+ sender. send ( handle_funding_generation_ready ! ( event, channel_value) ) . unwrap ( ) ;
396
+ } ;
397
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
398
+
399
+ // Open a channel and check that the FundingGenerationReady event was handled.
400
+ begin_open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , channel_value) ;
401
+ let timeout = Duration :: from_secs ( 5 * FRESHNESS_TIMER ) ;
402
+ let ( temporary_channel_id, tx) = receiver
403
+ . recv_timeout ( timeout)
404
+ . expect ( "FundingGenerationReady not handled within deadline" ) ;
405
+ end_open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , temporary_channel_id, tx) ;
406
+
407
+ assert ! ( bg_processor. stop( ) . is_ok( ) ) ;
408
+ }
372
409
}
0 commit comments